From 19044afed48aa8b5a4a53f1a722f2d193bc0da4f Mon Sep 17 00:00:00 2001 From: Chris Hostetter Date: Tue, 24 May 2016 13:30:13 -0700 Subject: [PATCH] SOLR-9159: New cloud based concurrent atomic update test --- solr/CHANGES.txt | 2 + .../conf/schema-minimal-atomic-stress.xml | 38 ++ .../TestStressCloudBlindAtomicUpdates.java | 483 ++++++++++++++++++ 3 files changed, 523 insertions(+) create mode 100644 solr/core/src/test-files/solr/collection1/conf/schema-minimal-atomic-stress.xml create mode 100644 solr/core/src/test/org/apache/solr/cloud/TestStressCloudBlindAtomicUpdates.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 437c48cebff..41b655915b1 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -325,6 +325,8 @@ Other Changes * SOLR-9131: Fix "start solr" text in cluster.vm Velocity template (janhoy) +* SOLR-9159: New cloud based concurrent atomic update test (hossman) + ================== 6.0.1 ================== (No Changes) diff --git a/solr/core/src/test-files/solr/collection1/conf/schema-minimal-atomic-stress.xml b/solr/core/src/test-files/solr/collection1/conf/schema-minimal-atomic-stress.xml new file mode 100644 index 00000000000..dffa365fd27 --- /dev/null +++ b/solr/core/src/test-files/solr/collection1/conf/schema-minimal-atomic-stress.xml @@ -0,0 +1,38 @@ + + + + + id + + + + + + + + + + + + + + + + diff --git a/solr/core/src/test/org/apache/solr/cloud/TestStressCloudBlindAtomicUpdates.java b/solr/core/src/test/org/apache/solr/cloud/TestStressCloudBlindAtomicUpdates.java new file mode 100644 index 00000000000..9e072efcde1 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/TestStressCloudBlindAtomicUpdates.java @@ -0,0 +1,483 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud; + +import java.io.File; +import java.lang.invoke.MethodHandles; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.solr.SolrTestCaseJ4.SuppressSSL; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.client.solrj.request.schema.SchemaRequest.Field; +import org.apache.solr.client.solrj.request.schema.SchemaRequest.FieldType; +import org.apache.solr.client.solrj.response.schema.SchemaResponse.FieldResponse; +import org.apache.solr.client.solrj.response.schema.SchemaResponse.FieldTypeResponse; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.SolrInputField; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.util.DefaultSolrThreadFactory; +import org.apache.solr.util.TestInjection; +import org.apache.lucene.util.LuceneTestCase.Slow; +import org.apache.lucene.util.TestUtil; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Stress test of Atomic Updates in a MinCloud Cluster. + * + * Focus of test is parallel threads hammering updates on diff docs using random clients/nodes, + * Optimistic Concurrency is not used here because of SOLR-8733, instead we just throw lots of + * "inc" operations at a numeric field and check that the math works out at the end. + */ +@Slow +@SuppressSSL(bugUrl="SSL overhead seems to cause OutOfMemory when stress testing") +public class TestStressCloudBlindAtomicUpdates extends SolrCloudTestCase { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final String DEBUG_LABEL = MethodHandles.lookup().lookupClass().getName(); + private static final String COLLECTION_NAME = "test_col"; + + /** A basic client for operations at the cloud level, default collection will be set */ + private static CloudSolrClient CLOUD_CLIENT; + /** One client per node */ + private static ArrayList CLIENTS = new ArrayList<>(5); + + /** Service to execute all parallel work + * @see #NUM_THREADS + */ + private static ExecutorService EXEC_SERVICE; + + /** num parallel threads in use by {@link #EXEC_SERVICE} */ + private static int NUM_THREADS; + + /** + * Used as an increment and multiplier when deciding how many docs should be in + * the test index. 1 means every doc in the index is a candidate for updates, bigger numbers mean a + * larger index is used (so tested docs are more likeely to be spread out in multiple segments) + */ + private static int DOC_ID_INCR; + + @BeforeClass + private static void createMiniSolrCloudCluster() throws Exception { + // NOTE: numDocsToCheck uses atLeast, so nightly & multiplier are alreayd a factor in index size + // no need to redundently factor them in here as well + DOC_ID_INCR = TestUtil.nextInt(random(), 1, 7); + + NUM_THREADS = atLeast(3); + EXEC_SERVICE = ExecutorUtil.newMDCAwareFixedThreadPool + (NUM_THREADS, new DefaultSolrThreadFactory(DEBUG_LABEL)); + + // at least 2, but don't go crazy on nightly/test.multiplier with "atLeast()" + final int numShards = TEST_NIGHTLY ? 5 : 2; + final int repFactor = 2; + final int numNodes = numShards * repFactor; + + final String configName = DEBUG_LABEL + "_config-set"; + final Path configDir = Paths.get(TEST_HOME(), "collection1", "conf"); + + configureCluster(numNodes).addConfig(configName, configDir).configure(); + + Map collectionProperties = new HashMap<>(); + collectionProperties.put("config", "solrconfig-tlog.xml"); + collectionProperties.put("schema", "schema-minimal-atomic-stress.xml"); + + assertNotNull(cluster.createCollection(COLLECTION_NAME, numShards, repFactor, + configName, null, null, collectionProperties)); + + CLOUD_CLIENT = cluster.getSolrClient(); + CLOUD_CLIENT.setDefaultCollection(COLLECTION_NAME); + + waitForRecoveriesToFinish(CLOUD_CLIENT); + + for (JettySolrRunner jetty : cluster.getJettySolrRunners()) { + CLIENTS.add(getHttpSolrClient(jetty.getBaseUrl() + "/" + COLLECTION_NAME + "/")); + } + + // sanity check no one broke the assumptions we make about our schema + checkExpectedSchemaType( map("name","long", + "class","solr.TrieLongField", + "multiValued",Boolean.FALSE, + "indexed",Boolean.FALSE, + "stored",Boolean.FALSE, + "docValues",Boolean.FALSE) ); + } + + @AfterClass + private static void afterClass() throws Exception { + TestInjection.reset(); + ExecutorUtil.shutdownAndAwaitTermination(EXEC_SERVICE); + EXEC_SERVICE = null; + CLOUD_CLIENT.close(); CLOUD_CLIENT = null; + for (HttpSolrClient client : CLIENTS) { + client.close(); + } + CLIENTS = null; + } + + @Before + private void clearCloudCollection() throws Exception { + assertEquals(0, CLOUD_CLIENT.deleteByQuery("*:*").getStatus()); + assertEquals(0, CLOUD_CLIENT.optimize().getStatus()); + + TestInjection.reset(); + + final int injectionPercentage = (int)Math.ceil(atLeast(1) / 2); + final String testInjection = usually() ? "false:0" : ("true:" + injectionPercentage); + log.info("TestInjection: fail replica, update pause, tlog pauses: " + testInjection); + TestInjection.failReplicaRequests = testInjection; + TestInjection.updateLogReplayRandomPause = testInjection; + TestInjection.updateRandomPause = testInjection; + } + + + public void test_dv() throws Exception { + String field = "long_dv"; + checkExpectedSchemaField(map("name", field, + "type","long", + "stored",Boolean.FALSE, + "indexed",Boolean.FALSE, + "docValues",Boolean.TRUE)); + + checkField(field); + } + public void test_dv_stored() throws Exception { + String field = "long_dv_stored"; + checkExpectedSchemaField(map("name", field, + "type","long", + "stored",Boolean.TRUE, + "indexed",Boolean.FALSE, + "docValues",Boolean.TRUE)); + + checkField(field); + + } + public void test_dv_stored_idx() throws Exception { + String field = "long_dv_stored_idx"; + checkExpectedSchemaField(map("name", field, + "type","long", + "stored",Boolean.TRUE, + "indexed",Boolean.TRUE, + "docValues",Boolean.TRUE)); + + checkField(field); + } + public void test_dv_idx() throws Exception { + String field = "long_dv_idx"; + checkExpectedSchemaField(map("name", field, + "type","long", + "stored",Boolean.FALSE, + "indexed",Boolean.TRUE, + "docValues",Boolean.TRUE)); + + checkField(field); + } + public void test_stored_idx() throws Exception { + String field = "long_stored_idx"; + checkExpectedSchemaField(map("name", field, + "type","long", + "stored",Boolean.TRUE, + "indexed",Boolean.TRUE, + "docValues",Boolean.FALSE)); + + checkField(field); + } + + public void checkField(final String numericFieldName) throws Exception { + + final CountDownLatch abortLatch = new CountDownLatch(1); + + final int numDocsToCheck = atLeast(37); + final int numDocsInIndex = (numDocsToCheck * DOC_ID_INCR); + final AtomicLong[] expected = new AtomicLong[numDocsToCheck]; + + log.info("Testing " + numericFieldName + ": numDocsToCheck=" + numDocsToCheck + ", numDocsInIndex=" + numDocsInIndex + ", incr=" + DOC_ID_INCR); + + // seed the index & keep track of what docs exist and with what values + for (int id = 0; id < numDocsInIndex; id++) { + // NOTE: the field we're mutating is a long, but we seed with a random int, + // and we will inc/dec by random smaller ints, to ensure we never over/under flow + final int initValue = random().nextInt(); + SolrInputDocument doc = doc(f("id",""+id), f(numericFieldName, initValue)); + UpdateResponse rsp = update(doc).process(CLOUD_CLIENT); + assertEquals(doc.toString() + " => " + rsp.toString(), 0, rsp.getStatus()); + if (0 == id % DOC_ID_INCR) { + expected[(int)(id / DOC_ID_INCR)] = new AtomicLong(initValue); + } + } + assertNotNull("Sanity Check no off-by-one in expected init: ", expected[expected.length-1]); + + + // sanity check index contents + assertEquals(0, CLOUD_CLIENT.commit().getStatus()); + assertEquals(numDocsInIndex, + CLOUD_CLIENT.query(params("q", "*:*")).getResults().getNumFound()); + + // spin up parallel workers to hammer updates + List> results = new ArrayList>(NUM_THREADS); + for (int workerId = 0; workerId < NUM_THREADS; workerId++) { + Worker worker = new Worker(workerId, expected, abortLatch, new Random(random().nextLong()), + numericFieldName); + // ask for the Worker to be returned in the Future so we can inspect it + results.add(EXEC_SERVICE.submit(worker, worker)); + } + // check the results of all our workers + for (Future r : results) { + try { + Worker w = r.get(); + if (! w.getFinishedOk() ) { + // quick and dirty sanity check if any workers didn't succeed, but didn't throw an exception either + abortLatch.countDown(); + log.error("worker={} didn't finish ok, but didn't throw exception?", w.workerId); + } + } catch (ExecutionException ee) { + Throwable rootCause = ee.getCause(); + if (rootCause instanceof Error) { + // low level error, or test assertion failure - either way don't leave it wrapped + log.error("Worker exec Error, throwing root cause", ee); + throw (Error) rootCause; + } else { + log.error("Worker ExecutionException, re-throwing", ee); + throw ee; + } + } + } + + assertEquals("Abort latch has changed, why didn't we get an exception from a worker?", + 1L, abortLatch.getCount()); + + TestInjection.reset(); + waitForRecoveriesToFinish(CLOUD_CLIENT); + + // check all the final index contents match our expectations + int incorrectDocs = 0; + for (int id = 0; id < numDocsInIndex; id += DOC_ID_INCR) { + assert 0 == id % DOC_ID_INCR : "WTF? " + id; + + final long expect = expected[(int)(id / DOC_ID_INCR)].longValue(); + + final String docId = "" + id; + + // sometimes include an fq on the expected value to ensure the updated values + // are "visible" for searching + final SolrParams p = (0 != TestUtil.nextInt(random(), 0,15)) + ? params() : params("fq",numericFieldName + ":" + expect); + SolrDocument doc = getRandClient(random()).getById(docId, p); + + final boolean foundWithFilter = (null != doc); + if (! foundWithFilter) { + // try again w/o fq to see what it does have + doc = getRandClient(random()).getById(docId); + } + + Long actual = (null == doc) ? null : (Long) doc.getFirstValue(numericFieldName); + if (actual == null || expect != actual.longValue() || ! foundWithFilter) { + log.error("docId={}, foundWithFilter={}, expected={}, actual={}", + docId, foundWithFilter, expect, actual); + incorrectDocs++; + } + + } + assertEquals("Some docs had errors -- check logs", 0, incorrectDocs); + } + + + public static final class Worker implements Runnable { + public final int workerId; + final AtomicLong[] expected; + final CountDownLatch abortLatch; + final Random rand; + final String updateField; + final int numDocsToUpdate; + boolean ok = false; // set to true only on successful completion + public Worker(int workerId, AtomicLong[] expected, CountDownLatch abortLatch, Random rand, + String updateField) { + this.workerId = workerId; + this.expected = expected; + this.abortLatch = abortLatch; + this.rand = rand; + this.updateField = updateField; + this.numDocsToUpdate = atLeast(rand, 25); + } + public boolean getFinishedOk() { + return ok; + } + private void doRandomAtomicUpdate(int docId) throws Exception { + assert 0 == docId % DOC_ID_INCR : "WTF? " + docId; + + final int delta = TestUtil.nextInt(rand, -1000, 1000); + log.info("worker={}, docId={}, delta={}", workerId, docId, delta); + + SolrClient client = getRandClient(rand); + SolrInputDocument doc = doc(f("id",""+docId), + f(updateField,Collections.singletonMap("inc",delta))); + UpdateResponse rsp = update(doc).process(client); + assertEquals(doc + " => " + rsp, 0, rsp.getStatus()); + + AtomicLong counter = expected[(int)(docId / DOC_ID_INCR)]; + assertNotNull("null counter for " + docId + "/" + DOC_ID_INCR, counter); + counter.getAndAdd(delta); + + } + + public void run() { + final String origThreadName = Thread.currentThread().getName(); + try { + Thread.currentThread().setName(origThreadName + "-w" + workerId); + final int maxDocMultiplier = expected.length-1; + for (int docIter = 0; docIter < numDocsToUpdate; docIter++) { + + final int docId = DOC_ID_INCR * TestUtil.nextInt(rand, 0, maxDocMultiplier); + + // tweak our thread name to keep track of what we're up to + Thread.currentThread().setName(origThreadName + "-w" + workerId + "-d" + docId); + + // no matter how random the doc selection may be per thread, ensure + // every doc that is selected by *a* thread gets at least a couple rapid fire updates + final int itersPerDoc = atLeast(rand, 2); + + for (int updateIter = 0; updateIter < itersPerDoc; updateIter++) { + if (0 == abortLatch.getCount()) { + return; + } + doRandomAtomicUpdate(docId); + } + if (rand.nextBoolean()) { Thread.yield(); } + } + + } catch (Error err) { + log.error(Thread.currentThread().getName(), err); + abortLatch.countDown(); + throw err; + } catch (Exception ex) { + log.error(Thread.currentThread().getName(), ex); + abortLatch.countDown(); + throw new RuntimeException(ex.getMessage(), ex); + } finally { + Thread.currentThread().setName(origThreadName); + } + ok = true; + } + } + + + public static UpdateRequest update(SolrInputDocument... docs) { + return update(null, docs); + } + public static UpdateRequest update(SolrParams params, SolrInputDocument... docs) { + UpdateRequest r = new UpdateRequest(); + if (null != params) { + r.setParams(new ModifiableSolrParams(params)); + } + r.add(Arrays.asList(docs)); + return r; + } + + public static SolrInputDocument doc(SolrInputField... fields) { + SolrInputDocument doc = new SolrInputDocument(); + for (SolrInputField f : fields) { + doc.put(f.getName(), f); + } + return doc; + } + + public static SolrInputField f(String fieldName, Object... values) { + SolrInputField f = new SolrInputField(fieldName); + f.setValue(values, 1.0F); + // TODO: soooooooooo stupid (but currently neccessary because atomic updates freak out + // if the Map with the "inc" operation is inside of a collection - even if it's the only "value") ... + if (1 == values.length) { + f.setValue(values[0], 1.0F); + } else { + f.setValue(values, 1.0F); + } + return f; + } + + public static SolrClient getRandClient(Random rand) { + int numClients = CLIENTS.size(); + int idx = TestUtil.nextInt(rand, 0, numClients); + return (idx == numClients) ? CLOUD_CLIENT : CLIENTS.get(idx); + } + + public static void waitForRecoveriesToFinish(CloudSolrClient client) throws Exception { + assert null != client.getDefaultCollection(); + AbstractDistribZkTestBase.waitForRecoveriesToFinish(client.getDefaultCollection(), + client.getZkStateReader(), + true, true, 330); + } + + /** + * Use the schema API to verify that the specified expected Field exists with those exact attributes. + * @see #CLOUD_CLIENT + */ + public static void checkExpectedSchemaField(Map expected) throws Exception { + String fieldName = (String) expected.get("name"); + assertNotNull("expected contains no name: " + expected, fieldName); + FieldResponse rsp = new Field(fieldName).process(CLOUD_CLIENT); + assertNotNull("Field Null Response: " + fieldName, rsp); + assertEquals("Field Status: " + fieldName + " => " + rsp.toString(), 0, rsp.getStatus()); + assertEquals("Field: " + fieldName, expected, rsp.getField()); + } + + /** + * Use the schema API to verify that the specified expected FieldType exists with those exact attributes. + * @see #CLOUD_CLIENT + */ + public static void checkExpectedSchemaType(Map expected) throws Exception { + + String typeName = (String) expected.get("name"); + assertNotNull("expected contains no type: " + expected, typeName); + FieldTypeResponse rsp = new FieldType(typeName).process(CLOUD_CLIENT); + assertNotNull("FieldType Null Response: " + typeName, rsp); + assertEquals("FieldType Status: " + typeName + " => " + rsp.toString(), 0, rsp.getStatus()); + assertEquals("FieldType: " + typeName, expected, rsp.getFieldType().getAttributes()); + + } +}