SOLR-12338: Replay buffering tlog in parallel

This commit is contained in:
Cao Manh Dat 2018-05-30 11:05:48 +07:00
parent 8821b80616
commit 6084da559c
13 changed files with 468 additions and 23 deletions

View File

@ -300,6 +300,8 @@ Optimizations
The /export (ExportQParserPlugin) would declare incorrectly that scores are needed.
Expanded docs (expand component) could be told incorrectly that scores are needed. (David Smiley)
* SOLR-12338: Replay buffering tlog in parallel. (Cao Manh Dat, David Smiley)
Other Changes
----------------------

View File

@ -110,6 +110,7 @@ import org.apache.solr.security.SecurityPluginHolder;
import org.apache.solr.update.SolrCoreState;
import org.apache.solr.update.UpdateShardHandler;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.OrderedExecutor;
import org.apache.solr.util.stats.MetricUtils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@ -161,6 +162,8 @@ public class CoreContainer {
private ExecutorService coreContainerWorkExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(
new DefaultSolrThreadFactory("coreContainerWorkExecutor") );
private final OrderedExecutor replayUpdatesExecutor;
protected LogWatcher logging = null;
private CloserThread backgroundCloser = null;
@ -302,6 +305,11 @@ public class CoreContainer {
this.coresLocator = locator;
this.containerProperties = new Properties(properties);
this.asyncSolrCoreLoad = asyncSolrCoreLoad;
this.replayUpdatesExecutor = new OrderedExecutor(
cfg.getReplayUpdatesThreads(),
ExecutorUtil.newMDCAwareCachedThreadPool(
cfg.getReplayUpdatesThreads(),
new DefaultSolrThreadFactory("replayUpdatesExecutor")));
}
private synchronized void initializeAuthorizationPlugin(Map<String, Object> authorizationConf) {
@ -443,6 +451,7 @@ public class CoreContainer {
coresLocator = null;
cfg = null;
containerProperties = null;
replayUpdatesExecutor = null;
}
public static CoreContainer createAndLoad(Path solrHome) {
@ -487,6 +496,10 @@ public class CoreContainer {
return metricsHistoryHandler;
}
public OrderedExecutor getReplayUpdatesExecutor() {
return replayUpdatesExecutor;
}
//-------------------------------------------------------------------
// Initialization / Cleanup
//-------------------------------------------------------------------
@ -777,6 +790,7 @@ public class CoreContainer {
isShutDown = true;
ExecutorUtil.shutdownAndAwaitTermination(coreContainerWorkExecutor);
replayUpdatesExecutor.shutdownAndAwaitTermination();
if (metricManager != null) {
metricManager.closeReporters(SolrMetricManager.getRegistryName(SolrInfoBean.Group.node));
metricManager.closeReporters(SolrMetricManager.getRegistryName(SolrInfoBean.Group.jvm));

View File

@ -59,6 +59,8 @@ public class NodeConfig {
private final Integer coreLoadThreads;
private final int replayUpdatesThreads;
@Deprecated
// This should be part of the transientCacheConfig, remove in 7.0
private final int transientCacheSize;
@ -77,7 +79,7 @@ public class NodeConfig {
PluginInfo shardHandlerFactoryConfig, UpdateShardHandlerConfig updateShardHandlerConfig,
String coreAdminHandlerClass, String collectionsAdminHandlerClass,
String healthCheckHandlerClass, String infoHandlerClass, String configSetsHandlerClass,
LogWatcherConfig logWatcherConfig, CloudConfig cloudConfig, Integer coreLoadThreads,
LogWatcherConfig logWatcherConfig, CloudConfig cloudConfig, Integer coreLoadThreads, int replayUpdatesThreads,
int transientCacheSize, boolean useSchemaCache, String managementPath, SolrResourceLoader loader,
Properties solrProperties, PluginInfo[] backupRepositoryPlugins,
MetricsConfig metricsConfig, PluginInfo transientCacheConfig) {
@ -96,6 +98,7 @@ public class NodeConfig {
this.logWatcherConfig = logWatcherConfig;
this.cloudConfig = cloudConfig;
this.coreLoadThreads = coreLoadThreads;
this.replayUpdatesThreads = replayUpdatesThreads;
this.transientCacheSize = transientCacheSize;
this.useSchemaCache = useSchemaCache;
this.managementPath = managementPath;
@ -137,6 +140,10 @@ public class NodeConfig {
: coreLoadThreads;
}
public int getReplayUpdatesThreads() {
return replayUpdatesThreads;
}
public String getSharedLibDirectory() {
return sharedLibDirectory;
}
@ -222,6 +229,7 @@ public class NodeConfig {
private LogWatcherConfig logWatcherConfig = new LogWatcherConfig(true, null, null, 50);
private CloudConfig cloudConfig;
private int coreLoadThreads = DEFAULT_CORE_LOAD_THREADS;
private int replayUpdatesThreads = Runtime.getRuntime().availableProcessors();
@Deprecated
//Remove in 7.0 and put it all in the transientCache element in solrconfig.xml
private int transientCacheSize = DEFAULT_TRANSIENT_CACHE_SIZE;
@ -341,6 +349,11 @@ public class NodeConfig {
return this;
}
public NodeConfigBuilder setReplayUpdatesThreads(int replayUpdatesThreads) {
this.replayUpdatesThreads = replayUpdatesThreads;
return this;
}
// Remove in Solr 7.0
@Deprecated
public NodeConfigBuilder setTransientCacheSize(int transientCacheSize) {
@ -381,7 +394,7 @@ public class NodeConfig {
public NodeConfig build() {
return new NodeConfig(nodeName, coreRootDirectory, solrDataHome, configSetBaseDirectory, sharedLibDirectory, shardHandlerFactoryConfig,
updateShardHandlerConfig, coreAdminHandlerClass, collectionsAdminHandlerClass, healthCheckHandlerClass, infoHandlerClass, configSetsHandlerClass,
logWatcherConfig, cloudConfig, coreLoadThreads, transientCacheSize, useSchemaCache, managementPath, loader, solrProperties,
logWatcherConfig, cloudConfig, coreLoadThreads, replayUpdatesThreads, transientCacheSize, useSchemaCache, managementPath, loader, solrProperties,
backupRepositoryPlugins, metricsConfig, transientCacheConfig);
}
}

View File

@ -271,6 +271,9 @@ public class SolrXmlConfig {
case "coreLoadThreads":
builder.setCoreLoadThreads(parseInt(name, value));
break;
case "replayUpdatesThreads":
builder.setReplayUpdatesThreads(parseInt(name, value));
break;
case "transientCacheSize":
builder.setTransientCacheSize(parseInt(name, value));
break;

View File

@ -42,6 +42,9 @@ import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
@ -55,6 +58,7 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrInfoBean;
@ -69,9 +73,11 @@ import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.OrderedExecutor;
import org.apache.solr.util.RTimer;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.TimeOut;
import org.apache.solr.util.plugin.PluginInfoInitialized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -1667,12 +1673,9 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
this.inSortedOrder = inSortedOrder;
}
private SolrQueryRequest req;
private SolrQueryResponse rsp;
@Override
public void run() {
ModifiableSolrParams params = new ModifiableSolrParams();
@ -1683,7 +1686,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp)); // setting request info will help logging
try {
for(;;) {
for (; ; ) {
TransactionLog translog = translogs.pollFirst();
if (translog == null) break;
doReplay(translog);
@ -1743,6 +1746,9 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(null);
UpdateRequestProcessor proc = processorChain.createProcessor(req, rsp);
OrderedExecutor executor = inSortedOrder ? null : req.getCore().getCoreContainer().getReplayUpdatesExecutor();
AtomicInteger pendingTasks = new AtomicInteger(0);
AtomicReference<SolrException> exceptionOnExecuteUpdate = new AtomicReference<>();
long commitVersion = 0;
int operationAndFlags = 0;
@ -1771,6 +1777,11 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
o = tlogReader.next();
if (o == null && activeLog) {
if (!finishing) {
// about to block all the updates including the tasks in the executor
// therefore we must wait for them to be finished
waitForAllUpdatesGetExecuted(pendingTasks);
// from this point, remain updates will be executed in a single thread
executor = null;
// block to prevent new adds, but don't immediately unlock since
// we could be starved from ever completing recovery. Only unlock
// after we've finished this recovery.
@ -1795,6 +1806,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
}
if (o == null) break;
// fail fast
if (exceptionOnExecuteUpdate.get() != null) throw exceptionOnExecuteUpdate.get();
try {
@ -1811,7 +1824,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
AddUpdateCommand cmd = convertTlogEntryToAddUpdateCommand(req, entry, oper, version);
cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
if (debug) log.debug("{} {}", oper == ADD ? "add" : "update", cmd);
proc.processAdd(cmd);
execute(cmd, executor, pendingTasks, proc, exceptionOnExecuteUpdate);
break;
}
case UpdateLog.DELETE: {
@ -1822,7 +1835,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
if (debug) log.debug("delete " + cmd);
proc.processDelete(cmd);
execute(cmd, executor, pendingTasks, proc, exceptionOnExecuteUpdate);
break;
}
@ -1834,7 +1847,9 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
if (debug) log.debug("deleteByQuery " + cmd);
proc.processDelete(cmd);
waitForAllUpdatesGetExecuted(pendingTasks);
// DBQ will be executed in the same thread
execute(cmd, null, pendingTasks, proc, exceptionOnExecuteUpdate);
break;
}
case UpdateLog.COMMIT: {
@ -1857,30 +1872,21 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
} else {
// XXX should not happen?
}
} catch (IOException ex) {
recoveryInfo.errors++;
loglog.warn("REPLAY_ERR: IOException reading log", ex);
// could be caused by an incomplete flush if recovering from log
} catch (ClassCastException cl) {
recoveryInfo.errors++;
loglog.warn("REPLAY_ERR: Unexpected log entry or corrupt log. Entry=" + o, cl);
// would be caused by a corrupt transaction log
} catch (SolrException ex) {
if (ex.code() == ErrorCode.SERVICE_UNAVAILABLE.code) {
throw ex;
}
recoveryInfo.errors++;
loglog.warn("REPLAY_ERR: IOException reading log", ex);
// could be caused by an incomplete flush if recovering from log
} catch (Exception ex) {
recoveryInfo.errors++;
loglog.warn("REPLAY_ERR: Exception replaying log", ex);
// something wrong with the request?
}
assert TestInjection.injectUpdateLogReplayRandomPause();
}
waitForAllUpdatesGetExecuted(pendingTasks);
if (exceptionOnExecuteUpdate.get() != null) throw exceptionOnExecuteUpdate.get();
CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
cmd.setVersion(commitVersion);
cmd.softCommit = false;
@ -1917,6 +1923,93 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
translog.decref();
}
}
private void waitForAllUpdatesGetExecuted(AtomicInteger pendingTasks) {
TimeOut timeOut = new TimeOut(Integer.MAX_VALUE, TimeUnit.MILLISECONDS, TimeSource.CURRENT_TIME);
try {
timeOut.waitFor("Timeout waiting for replay updates finish", () -> {
//TODO handle the case when there are no progress after a long time
return pendingTasks.get() == 0;
});
} catch (TimeoutException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
}
private Integer getBucketHash(UpdateCommand cmd) {
if (cmd instanceof AddUpdateCommand) {
BytesRef idBytes = ((AddUpdateCommand)cmd).getIndexedId();
if (idBytes == null) return null;
return DistributedUpdateProcessor.bucketHash(idBytes);
}
if (cmd instanceof DeleteUpdateCommand) {
BytesRef idBytes = ((DeleteUpdateCommand)cmd).getIndexedId();
if (idBytes == null) return null;
return DistributedUpdateProcessor.bucketHash(idBytes);
}
return null;
}
private void execute(UpdateCommand cmd, OrderedExecutor executor,
AtomicInteger pendingTasks, UpdateRequestProcessor proc,
AtomicReference<SolrException> exceptionHolder) {
assert cmd instanceof AddUpdateCommand || cmd instanceof DeleteUpdateCommand;
if (executor != null) {
// by using the same hash as DUP, independent updates can avoid waiting for same bucket
executor.execute(getBucketHash(cmd), () -> {
try {
// fail fast
if (exceptionHolder.get() != null) return;
if (cmd instanceof AddUpdateCommand) {
proc.processAdd((AddUpdateCommand) cmd);
} else {
proc.processDelete((DeleteUpdateCommand) cmd);
}
} catch (IOException e) {
recoveryInfo.errors++;
loglog.warn("REPLAY_ERR: IOException reading log", e);
// could be caused by an incomplete flush if recovering from log
} catch (SolrException e) {
if (e.code() == ErrorCode.SERVICE_UNAVAILABLE.code) {
exceptionHolder.compareAndSet(null, e);
return;
}
recoveryInfo.errors++;
loglog.warn("REPLAY_ERR: IOException reading log", e);
} finally {
pendingTasks.decrementAndGet();
}
});
pendingTasks.incrementAndGet();
} else {
try {
if (cmd instanceof AddUpdateCommand) {
proc.processAdd((AddUpdateCommand) cmd);
} else {
proc.processDelete((DeleteUpdateCommand) cmd);
}
} catch (IOException e) {
recoveryInfo.errors++;
loglog.warn("REPLAY_ERR: IOException replaying log", e);
// could be caused by an incomplete flush if recovering from log
} catch (SolrException e) {
if (e.code() == ErrorCode.SERVICE_UNAVAILABLE.code) {
throw e;
}
recoveryInfo.errors++;
loglog.warn("REPLAY_ERR: IOException replaying log", e);
}
}
}
}
/**

View File

@ -957,6 +957,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
isIndexChanged = true;
}
public static int bucketHash(BytesRef idBytes) {
assert idBytes != null;
return Hash.murmurhash3_x86_32(idBytes.bytes, idBytes.offset, idBytes.length, 0);
}
/**
* @return whether or not to drop this cmd
* @throws IOException If there is a low-level I/O error.
@ -981,7 +986,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
// This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash here)
int bucketHash = Hash.murmurhash3_x86_32(idBytes.bytes, idBytes.offset, idBytes.length, 0);
int bucketHash = bucketHash(idBytes);
// at this point, there is an update we need to try and apply.
// we may or may not be the leader.
@ -1745,7 +1750,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
// This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash here)
int bucketHash = Hash.murmurhash3_x86_32(idBytes.bytes, idBytes.offset, idBytes.length, 0);
int bucketHash = bucketHash(idBytes);
// at this point, there is an update we need to try and apply.
// we may or may not be the leader.

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.util;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import org.apache.solr.common.util.ExecutorUtil;
public class OrderedExecutor implements Executor {
private final ExecutorService delegate;
private final SparseStripedLock<Integer> sparseStripedLock;
public OrderedExecutor(int numThreads, ExecutorService delegate) {
this.delegate = delegate;
this.sparseStripedLock = new SparseStripedLock<>(numThreads);
}
@Override
public void execute(Runnable runnable) {
execute(null, runnable);
}
/**
* Execute the given command in the future.
* If another command with same {@code lockId} is waiting in the queue or running,
* this method will block until that command finish.
* Therefore different commands with same {@code hash} will be executed in order of calling this method.
*
* If multiple caller are waiting for a command to finish, there are no guarantee that the earliest call will win.
*
* @param lockId of the {@code command}, if null then a random hash will be generated
* @param command the runnable task
*
* @throws RejectedExecutionException if this task cannot be accepted for execution
*/
public void execute(Integer lockId, Runnable command) {
try {
sparseStripedLock.add(lockId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
try {
if (delegate.isShutdown()) throw new RejectedExecutionException();
delegate.execute(() -> {
try {
command.run();
} finally {
sparseStripedLock.remove(lockId);
}
});
} catch (RejectedExecutionException e) {
sparseStripedLock.remove(lockId);
throw e;
}
}
public void shutdownAndAwaitTermination() {
ExecutorUtil.shutdownAndAwaitTermination(delegate);
}
/** A set of locks by a key {@code T}, kind of like Google Striped but the keys are sparse/lazy. */
private static class SparseStripedLock<T> {
private ConcurrentHashMap<T, CountDownLatch> map = new ConcurrentHashMap<>();
private final Semaphore sizeSemaphore;
SparseStripedLock(int maxSize) {
this.sizeSemaphore = new Semaphore(maxSize);
}
public void add(T t) throws InterruptedException {
if (t != null) {
CountDownLatch myLock = new CountDownLatch(1);
CountDownLatch existingLock = map.putIfAbsent(t, myLock);
while (existingLock != null) {
// wait for existing lock/permit to become available (see remove() below)
existingLock.await();
existingLock = map.putIfAbsent(t, myLock);
}
// myLock was successfully inserted
}
// won the lock
sizeSemaphore.acquire();
}
public void remove(T t) {
if (t != null) {
// remove and signal to any "await"-ers
map.remove(t).countDown();
}
sizeSemaphore.release();
}
}
}

View File

@ -26,6 +26,7 @@
<str name="sharedLib">testSharedLib</str>
<str name="shareSchema">${shareSchema:true}</str>
<int name="transientCacheSize">66</int>
<int name="replayUpdatesThreads">100</int>
<solrcloud>
<int name="distribUpdateConnTimeout">22</int>

View File

@ -74,6 +74,7 @@ public class TestSolrXml extends SolrTestCaseJ4 {
assertEquals("info handler class", "testInfoHandler", cfg.getInfoHandlerClass());
assertEquals("config set handler class", "testConfigSetsHandler", cfg.getConfigSetsHandlerClass());
assertEquals("core load threads", 11, cfg.getCoreLoadThreadCount(false));
assertEquals("replay update threads", 100, cfg.getReplayUpdatesThreads());
assertThat("core root dir", cfg.getCoreRootDirectory().toString(), containsString("testCoreRootDirectory"));
assertEquals("distrib conn timeout", 22, cfg.getUpdateShardHandlerConfig().getDistributedConnectionTimeout());
assertEquals("distrib socket timeout", 33, cfg.getUpdateShardHandlerConfig().getDistributedSocketTimeout());

View File

@ -47,8 +47,10 @@ import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
@ -100,6 +102,85 @@ public class TestRecovery extends SolrTestCaseJ4 {
return registry.getMetrics();
}
@Test
public void stressLogReplay() throws Exception {
final int NUM_UPDATES = 150;
try {
DirectUpdateHandler2.commitOnClose = false;
final Semaphore logReplay = new Semaphore(0);
final Semaphore logReplayFinish = new Semaphore(0);
UpdateLog.testing_logReplayHook = () -> {
try {
assertTrue(logReplay.tryAcquire(timeout, TimeUnit.SECONDS));
} catch (Exception e) {
throw new RuntimeException(e);
}
};
UpdateLog.testing_logReplayFinishHook = logReplayFinish::release;
clearIndex();
assertU(commit());
Map<Integer, Integer> docIdToVal = new HashMap<>();
for (int i = 0; i < NUM_UPDATES; i++) {
int kindOfUpdate = random().nextInt(100);
if (docIdToVal.size() < 10) kindOfUpdate = 0;
if (kindOfUpdate <= 50) {
// add a new document update, may by duplicate with the current one
int val = random().nextInt(1000);
int docId = random().nextInt(10000);
addAndGetVersion(sdoc("id", String.valueOf(docId), "val_i_dvo", val), null);
docIdToVal.put(docId, val);
} else if (kindOfUpdate <= 80) {
// inc val of a document
ArrayList<Integer> ids = new ArrayList<>(docIdToVal.keySet());
int docId = ids.get(random().nextInt(ids.size()));
int delta = random().nextInt(10);
addAndGetVersion(sdoc("id", String.valueOf(docId), "val_i_dvo", map("inc", delta)), null);
docIdToVal.put(docId, docIdToVal.get(docId) + delta);
} else if (kindOfUpdate <= 85) {
// set val of a document
ArrayList<Integer> ids = new ArrayList<>(docIdToVal.keySet());
int docId = ids.get(random().nextInt(ids.size()));
int val = random().nextInt(1000);
addAndGetVersion(sdoc("id", String.valueOf(docId), "val_i_dvo", map("set", val)), null);
docIdToVal.put(docId, val);
} else if (kindOfUpdate <= 90) {
// delete by id
ArrayList<Integer> vals = new ArrayList<>(docIdToVal.values());
int val = vals.get(random().nextInt(vals.size()));
deleteByQueryAndGetVersion("val_i_dvo:"+val, null);
docIdToVal.entrySet().removeIf(integerIntegerEntry -> integerIntegerEntry.getValue() == val);
} else {
// delete by query
ArrayList<Integer> ids = new ArrayList<>(docIdToVal.keySet());
int docId = ids.get(random().nextInt(ids.size()));
deleteAndGetVersion(String.valueOf(docId), null);
docIdToVal.remove(docId);
}
}
h.close();
createCore();
assertJQ(req("q","*:*") ,"/response/numFound==0");
// unblock recovery
logReplay.release(Integer.MAX_VALUE);
assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS));
assertU(commit());
assertJQ(req("q","*:*") ,"/response/numFound=="+docIdToVal.size());
for (Map.Entry<Integer, Integer> entry : docIdToVal.entrySet()) {
assertJQ(req("q","id:"+entry.getKey(), "fl", "val_i_dvo") ,
"/response/numFound==1",
"/response/docs==[{'val_i_dvo':"+entry.getValue()+"}]");
}
} finally {
DirectUpdateHandler2.commitOnClose = true;
UpdateLog.testing_logReplayHook = null;
UpdateLog.testing_logReplayFinishHook = null;
}
}
@Test
public void testLogReplay() throws Exception {

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.util;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.common.util.ExecutorUtil;
import org.junit.Test;
public class OrderedExecutorTest extends LuceneTestCase {
@Test
public void testExecutionInOrder() {
OrderedExecutor orderedExecutor = new OrderedExecutor(10, ExecutorUtil.newMDCAwareCachedThreadPool("executeInOrderTest"));
IntBox intBox = new IntBox();
for (int i = 0; i < 100; i++) {
orderedExecutor.execute(1, () -> intBox.value++);
}
orderedExecutor.shutdownAndAwaitTermination();
assertEquals(intBox.value, 100);
}
@Test
public void testLockWhenQueueIsFull() {
OrderedExecutor orderedExecutor = new OrderedExecutor(10, ExecutorUtil.newMDCAwareCachedThreadPool("testLockWhenQueueIsFull"));
IntBox intBox = new IntBox();
long t = System.nanoTime();
orderedExecutor.execute(1, () -> {
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
intBox.value++;
});
assertTrue(System.nanoTime() - t < 100 * 1000000);
t = System.nanoTime();
orderedExecutor.execute(1, () -> {
intBox.value++;
});
assertTrue(System.nanoTime() - t > 300 * 1000000);
orderedExecutor.shutdownAndAwaitTermination();
assertEquals(intBox.value, 2);
}
@Test
public void testRunInParallel() {
OrderedExecutor orderedExecutor = new OrderedExecutor(10, ExecutorUtil.newMDCAwareCachedThreadPool("testLockWhenQueueIsFull"));
AtomicInteger atomicInteger = new AtomicInteger(0);
orderedExecutor.execute(1, () -> {
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (atomicInteger.get() == 1) atomicInteger.incrementAndGet();
});
orderedExecutor.execute(2, atomicInteger::incrementAndGet);
orderedExecutor.shutdownAndAwaitTermination();
assertEquals(atomicInteger.get(), 2);
}
@Test
public void testStress() {
int N = random().nextInt(50) + 20;
Map<Integer, Integer> base = new HashMap<>();
Map<Integer, Integer> run = new HashMap<>();
for (int i = 0; i < N; i++) {
base.put(i, i);
run.put(i, i);
}
OrderedExecutor orderedExecutor = new OrderedExecutor(10, ExecutorUtil.newMDCAwareCachedThreadPool("testStress"));
for (int i = 0; i < 1000; i++) {
int key = random().nextInt(N);
base.put(key, base.get(key) + 1);
orderedExecutor.execute(key, () -> run.put(key, run.get(key) + 1));
}
orderedExecutor.shutdownAndAwaitTermination();
assertTrue(base.equals(run));
}
private static class IntBox {
int value;
}
}

View File

@ -69,6 +69,10 @@ As above, for custom InfoHandler implementations.
`coreLoadThreads`::
Specifies the number of threads that will be assigned to load cores in parallel.
`replayUpdatesThreads`::
Specifies the number of threads (default value is the number of processors) that will be assigned to replay updates in parallel.
This pool is shared for all cores of the node.
`coreRootDirectory`::
The root of the core discovery tree, defaults to `$SOLR_HOME` (by default, `server/solr`).

View File

@ -126,6 +126,13 @@ public class ExecutorUtil {
threadFactory);
}
public static ExecutorService newMDCAwareCachedThreadPool(int maxThreads, ThreadFactory threadFactory) {
return new MDCAwareThreadPoolExecutor(0, maxThreads,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(maxThreads),
threadFactory);
}
@SuppressForbidden(reason = "class customizes ThreadPoolExecutor so it can be used instead")
public static class MDCAwareThreadPoolExecutor extends ThreadPoolExecutor {