SOLR-9159: New cloud based concurrent atomic update test

This commit is contained in:
Chris Hostetter 2016-05-24 13:30:13 -07:00
parent 6e8ca1a094
commit 19044afed4
3 changed files with 523 additions and 0 deletions

View File

@ -325,6 +325,8 @@ Other Changes
* SOLR-9131: Fix "start solr" text in cluster.vm Velocity template (janhoy) * SOLR-9131: Fix "start solr" text in cluster.vm Velocity template (janhoy)
* SOLR-9159: New cloud based concurrent atomic update test (hossman)
================== 6.0.1 ================== ================== 6.0.1 ==================
(No Changes) (No Changes)

View File

@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<schema name="minimal-atomic-stress" version="1.6">
<!-- minimal fields needed to stress test atomic updates
See: TestStressCloudBlindAtomicUpdates
-->
<uniqueKey>id</uniqueKey>
<field name="id" type="string" indexed="true" stored="true" docValues="true"/>
<field name="_version_" type="long" indexed="false" stored="false" docValues="true" />
<!-- atomic updates should work on all of these permutations -->
<field name="long_dv" type="long" indexed="false" stored="false" docValues="true" />
<field name="long_dv_stored" type="long" indexed="false" stored="true" docValues="true" />
<field name="long_dv_stored_idx" type="long" indexed="true" stored="true" docValues="true" />
<field name="long_dv_idx" type="long" indexed="true" stored="false" docValues="true" />
<field name="long_stored_idx" type="long" indexed="true" stored="true" docValues="false" />
<fieldType name="string" class="solr.StrField" multiValued="false" indexed="false" stored="false" docValues="false" />
<fieldType name="long" class="solr.TrieLongField" multiValued="false" indexed="false" stored="false" docValues="false"/>
<!-- unused, but play nice with existing solrconfig so we don't have to create a new one just for this test -->
<dynamicField name="*" type="string" indexed="true" stored="true" />
</schema>

View File

@ -0,0 +1,483 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.client.solrj.request.schema.SchemaRequest.Field;
import org.apache.solr.client.solrj.request.schema.SchemaRequest.FieldType;
import org.apache.solr.client.solrj.response.schema.SchemaResponse.FieldResponse;
import org.apache.solr.client.solrj.response.schema.SchemaResponse.FieldTypeResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.TestInjection;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util.TestUtil;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Stress test of Atomic Updates in a MinCloud Cluster.
*
* Focus of test is parallel threads hammering updates on diff docs using random clients/nodes,
* Optimistic Concurrency is not used here because of SOLR-8733, instead we just throw lots of
* "inc" operations at a numeric field and check that the math works out at the end.
*/
@Slow
@SuppressSSL(bugUrl="SSL overhead seems to cause OutOfMemory when stress testing")
public class TestStressCloudBlindAtomicUpdates extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String DEBUG_LABEL = MethodHandles.lookup().lookupClass().getName();
private static final String COLLECTION_NAME = "test_col";
/** A basic client for operations at the cloud level, default collection will be set */
private static CloudSolrClient CLOUD_CLIENT;
/** One client per node */
private static ArrayList<HttpSolrClient> CLIENTS = new ArrayList<>(5);
/** Service to execute all parallel work
* @see #NUM_THREADS
*/
private static ExecutorService EXEC_SERVICE;
/** num parallel threads in use by {@link #EXEC_SERVICE} */
private static int NUM_THREADS;
/**
* Used as an increment and multiplier when deciding how many docs should be in
* the test index. 1 means every doc in the index is a candidate for updates, bigger numbers mean a
* larger index is used (so tested docs are more likeely to be spread out in multiple segments)
*/
private static int DOC_ID_INCR;
@BeforeClass
private static void createMiniSolrCloudCluster() throws Exception {
// NOTE: numDocsToCheck uses atLeast, so nightly & multiplier are alreayd a factor in index size
// no need to redundently factor them in here as well
DOC_ID_INCR = TestUtil.nextInt(random(), 1, 7);
NUM_THREADS = atLeast(3);
EXEC_SERVICE = ExecutorUtil.newMDCAwareFixedThreadPool
(NUM_THREADS, new DefaultSolrThreadFactory(DEBUG_LABEL));
// at least 2, but don't go crazy on nightly/test.multiplier with "atLeast()"
final int numShards = TEST_NIGHTLY ? 5 : 2;
final int repFactor = 2;
final int numNodes = numShards * repFactor;
final String configName = DEBUG_LABEL + "_config-set";
final Path configDir = Paths.get(TEST_HOME(), "collection1", "conf");
configureCluster(numNodes).addConfig(configName, configDir).configure();
Map<String, String> collectionProperties = new HashMap<>();
collectionProperties.put("config", "solrconfig-tlog.xml");
collectionProperties.put("schema", "schema-minimal-atomic-stress.xml");
assertNotNull(cluster.createCollection(COLLECTION_NAME, numShards, repFactor,
configName, null, null, collectionProperties));
CLOUD_CLIENT = cluster.getSolrClient();
CLOUD_CLIENT.setDefaultCollection(COLLECTION_NAME);
waitForRecoveriesToFinish(CLOUD_CLIENT);
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
CLIENTS.add(getHttpSolrClient(jetty.getBaseUrl() + "/" + COLLECTION_NAME + "/"));
}
// sanity check no one broke the assumptions we make about our schema
checkExpectedSchemaType( map("name","long",
"class","solr.TrieLongField",
"multiValued",Boolean.FALSE,
"indexed",Boolean.FALSE,
"stored",Boolean.FALSE,
"docValues",Boolean.FALSE) );
}
@AfterClass
private static void afterClass() throws Exception {
TestInjection.reset();
ExecutorUtil.shutdownAndAwaitTermination(EXEC_SERVICE);
EXEC_SERVICE = null;
CLOUD_CLIENT.close(); CLOUD_CLIENT = null;
for (HttpSolrClient client : CLIENTS) {
client.close();
}
CLIENTS = null;
}
@Before
private void clearCloudCollection() throws Exception {
assertEquals(0, CLOUD_CLIENT.deleteByQuery("*:*").getStatus());
assertEquals(0, CLOUD_CLIENT.optimize().getStatus());
TestInjection.reset();
final int injectionPercentage = (int)Math.ceil(atLeast(1) / 2);
final String testInjection = usually() ? "false:0" : ("true:" + injectionPercentage);
log.info("TestInjection: fail replica, update pause, tlog pauses: " + testInjection);
TestInjection.failReplicaRequests = testInjection;
TestInjection.updateLogReplayRandomPause = testInjection;
TestInjection.updateRandomPause = testInjection;
}
public void test_dv() throws Exception {
String field = "long_dv";
checkExpectedSchemaField(map("name", field,
"type","long",
"stored",Boolean.FALSE,
"indexed",Boolean.FALSE,
"docValues",Boolean.TRUE));
checkField(field);
}
public void test_dv_stored() throws Exception {
String field = "long_dv_stored";
checkExpectedSchemaField(map("name", field,
"type","long",
"stored",Boolean.TRUE,
"indexed",Boolean.FALSE,
"docValues",Boolean.TRUE));
checkField(field);
}
public void test_dv_stored_idx() throws Exception {
String field = "long_dv_stored_idx";
checkExpectedSchemaField(map("name", field,
"type","long",
"stored",Boolean.TRUE,
"indexed",Boolean.TRUE,
"docValues",Boolean.TRUE));
checkField(field);
}
public void test_dv_idx() throws Exception {
String field = "long_dv_idx";
checkExpectedSchemaField(map("name", field,
"type","long",
"stored",Boolean.FALSE,
"indexed",Boolean.TRUE,
"docValues",Boolean.TRUE));
checkField(field);
}
public void test_stored_idx() throws Exception {
String field = "long_stored_idx";
checkExpectedSchemaField(map("name", field,
"type","long",
"stored",Boolean.TRUE,
"indexed",Boolean.TRUE,
"docValues",Boolean.FALSE));
checkField(field);
}
public void checkField(final String numericFieldName) throws Exception {
final CountDownLatch abortLatch = new CountDownLatch(1);
final int numDocsToCheck = atLeast(37);
final int numDocsInIndex = (numDocsToCheck * DOC_ID_INCR);
final AtomicLong[] expected = new AtomicLong[numDocsToCheck];
log.info("Testing " + numericFieldName + ": numDocsToCheck=" + numDocsToCheck + ", numDocsInIndex=" + numDocsInIndex + ", incr=" + DOC_ID_INCR);
// seed the index & keep track of what docs exist and with what values
for (int id = 0; id < numDocsInIndex; id++) {
// NOTE: the field we're mutating is a long, but we seed with a random int,
// and we will inc/dec by random smaller ints, to ensure we never over/under flow
final int initValue = random().nextInt();
SolrInputDocument doc = doc(f("id",""+id), f(numericFieldName, initValue));
UpdateResponse rsp = update(doc).process(CLOUD_CLIENT);
assertEquals(doc.toString() + " => " + rsp.toString(), 0, rsp.getStatus());
if (0 == id % DOC_ID_INCR) {
expected[(int)(id / DOC_ID_INCR)] = new AtomicLong(initValue);
}
}
assertNotNull("Sanity Check no off-by-one in expected init: ", expected[expected.length-1]);
// sanity check index contents
assertEquals(0, CLOUD_CLIENT.commit().getStatus());
assertEquals(numDocsInIndex,
CLOUD_CLIENT.query(params("q", "*:*")).getResults().getNumFound());
// spin up parallel workers to hammer updates
List<Future<Worker>> results = new ArrayList<Future<Worker>>(NUM_THREADS);
for (int workerId = 0; workerId < NUM_THREADS; workerId++) {
Worker worker = new Worker(workerId, expected, abortLatch, new Random(random().nextLong()),
numericFieldName);
// ask for the Worker to be returned in the Future so we can inspect it
results.add(EXEC_SERVICE.submit(worker, worker));
}
// check the results of all our workers
for (Future<Worker> r : results) {
try {
Worker w = r.get();
if (! w.getFinishedOk() ) {
// quick and dirty sanity check if any workers didn't succeed, but didn't throw an exception either
abortLatch.countDown();
log.error("worker={} didn't finish ok, but didn't throw exception?", w.workerId);
}
} catch (ExecutionException ee) {
Throwable rootCause = ee.getCause();
if (rootCause instanceof Error) {
// low level error, or test assertion failure - either way don't leave it wrapped
log.error("Worker exec Error, throwing root cause", ee);
throw (Error) rootCause;
} else {
log.error("Worker ExecutionException, re-throwing", ee);
throw ee;
}
}
}
assertEquals("Abort latch has changed, why didn't we get an exception from a worker?",
1L, abortLatch.getCount());
TestInjection.reset();
waitForRecoveriesToFinish(CLOUD_CLIENT);
// check all the final index contents match our expectations
int incorrectDocs = 0;
for (int id = 0; id < numDocsInIndex; id += DOC_ID_INCR) {
assert 0 == id % DOC_ID_INCR : "WTF? " + id;
final long expect = expected[(int)(id / DOC_ID_INCR)].longValue();
final String docId = "" + id;
// sometimes include an fq on the expected value to ensure the updated values
// are "visible" for searching
final SolrParams p = (0 != TestUtil.nextInt(random(), 0,15))
? params() : params("fq",numericFieldName + ":" + expect);
SolrDocument doc = getRandClient(random()).getById(docId, p);
final boolean foundWithFilter = (null != doc);
if (! foundWithFilter) {
// try again w/o fq to see what it does have
doc = getRandClient(random()).getById(docId);
}
Long actual = (null == doc) ? null : (Long) doc.getFirstValue(numericFieldName);
if (actual == null || expect != actual.longValue() || ! foundWithFilter) {
log.error("docId={}, foundWithFilter={}, expected={}, actual={}",
docId, foundWithFilter, expect, actual);
incorrectDocs++;
}
}
assertEquals("Some docs had errors -- check logs", 0, incorrectDocs);
}
public static final class Worker implements Runnable {
public final int workerId;
final AtomicLong[] expected;
final CountDownLatch abortLatch;
final Random rand;
final String updateField;
final int numDocsToUpdate;
boolean ok = false; // set to true only on successful completion
public Worker(int workerId, AtomicLong[] expected, CountDownLatch abortLatch, Random rand,
String updateField) {
this.workerId = workerId;
this.expected = expected;
this.abortLatch = abortLatch;
this.rand = rand;
this.updateField = updateField;
this.numDocsToUpdate = atLeast(rand, 25);
}
public boolean getFinishedOk() {
return ok;
}
private void doRandomAtomicUpdate(int docId) throws Exception {
assert 0 == docId % DOC_ID_INCR : "WTF? " + docId;
final int delta = TestUtil.nextInt(rand, -1000, 1000);
log.info("worker={}, docId={}, delta={}", workerId, docId, delta);
SolrClient client = getRandClient(rand);
SolrInputDocument doc = doc(f("id",""+docId),
f(updateField,Collections.singletonMap("inc",delta)));
UpdateResponse rsp = update(doc).process(client);
assertEquals(doc + " => " + rsp, 0, rsp.getStatus());
AtomicLong counter = expected[(int)(docId / DOC_ID_INCR)];
assertNotNull("null counter for " + docId + "/" + DOC_ID_INCR, counter);
counter.getAndAdd(delta);
}
public void run() {
final String origThreadName = Thread.currentThread().getName();
try {
Thread.currentThread().setName(origThreadName + "-w" + workerId);
final int maxDocMultiplier = expected.length-1;
for (int docIter = 0; docIter < numDocsToUpdate; docIter++) {
final int docId = DOC_ID_INCR * TestUtil.nextInt(rand, 0, maxDocMultiplier);
// tweak our thread name to keep track of what we're up to
Thread.currentThread().setName(origThreadName + "-w" + workerId + "-d" + docId);
// no matter how random the doc selection may be per thread, ensure
// every doc that is selected by *a* thread gets at least a couple rapid fire updates
final int itersPerDoc = atLeast(rand, 2);
for (int updateIter = 0; updateIter < itersPerDoc; updateIter++) {
if (0 == abortLatch.getCount()) {
return;
}
doRandomAtomicUpdate(docId);
}
if (rand.nextBoolean()) { Thread.yield(); }
}
} catch (Error err) {
log.error(Thread.currentThread().getName(), err);
abortLatch.countDown();
throw err;
} catch (Exception ex) {
log.error(Thread.currentThread().getName(), ex);
abortLatch.countDown();
throw new RuntimeException(ex.getMessage(), ex);
} finally {
Thread.currentThread().setName(origThreadName);
}
ok = true;
}
}
public static UpdateRequest update(SolrInputDocument... docs) {
return update(null, docs);
}
public static UpdateRequest update(SolrParams params, SolrInputDocument... docs) {
UpdateRequest r = new UpdateRequest();
if (null != params) {
r.setParams(new ModifiableSolrParams(params));
}
r.add(Arrays.asList(docs));
return r;
}
public static SolrInputDocument doc(SolrInputField... fields) {
SolrInputDocument doc = new SolrInputDocument();
for (SolrInputField f : fields) {
doc.put(f.getName(), f);
}
return doc;
}
public static SolrInputField f(String fieldName, Object... values) {
SolrInputField f = new SolrInputField(fieldName);
f.setValue(values, 1.0F);
// TODO: soooooooooo stupid (but currently neccessary because atomic updates freak out
// if the Map with the "inc" operation is inside of a collection - even if it's the only "value") ...
if (1 == values.length) {
f.setValue(values[0], 1.0F);
} else {
f.setValue(values, 1.0F);
}
return f;
}
public static SolrClient getRandClient(Random rand) {
int numClients = CLIENTS.size();
int idx = TestUtil.nextInt(rand, 0, numClients);
return (idx == numClients) ? CLOUD_CLIENT : CLIENTS.get(idx);
}
public static void waitForRecoveriesToFinish(CloudSolrClient client) throws Exception {
assert null != client.getDefaultCollection();
AbstractDistribZkTestBase.waitForRecoveriesToFinish(client.getDefaultCollection(),
client.getZkStateReader(),
true, true, 330);
}
/**
* Use the schema API to verify that the specified expected Field exists with those exact attributes.
* @see #CLOUD_CLIENT
*/
public static void checkExpectedSchemaField(Map<String,Object> expected) throws Exception {
String fieldName = (String) expected.get("name");
assertNotNull("expected contains no name: " + expected, fieldName);
FieldResponse rsp = new Field(fieldName).process(CLOUD_CLIENT);
assertNotNull("Field Null Response: " + fieldName, rsp);
assertEquals("Field Status: " + fieldName + " => " + rsp.toString(), 0, rsp.getStatus());
assertEquals("Field: " + fieldName, expected, rsp.getField());
}
/**
* Use the schema API to verify that the specified expected FieldType exists with those exact attributes.
* @see #CLOUD_CLIENT
*/
public static void checkExpectedSchemaType(Map<String,Object> expected) throws Exception {
String typeName = (String) expected.get("name");
assertNotNull("expected contains no type: " + expected, typeName);
FieldTypeResponse rsp = new FieldType(typeName).process(CLOUD_CLIENT);
assertNotNull("FieldType Null Response: " + typeName, rsp);
assertEquals("FieldType Status: " + typeName + " => " + rsp.toString(), 0, rsp.getStatus());
assertEquals("FieldType: " + typeName, expected, rsp.getFieldType().getAttributes());
}
}