SOLR-12833: Avoid unnecessary memory cost when DistributedUpdateProcessor timed-out lock is not used.

This commit is contained in:
Andrzej Bialecki 2019-04-30 22:31:06 +02:00
parent 16ca0c35f6
commit 70e090717d
7 changed files with 540 additions and 286 deletions

View File

@ -266,6 +266,9 @@ Improvements
* SOLR-13432: Add .toString methods to BitDocSet and SortedIntDocSet so that enabling "showItems" on the filter caches
shows some useful information about the values in the cache. (shalin)
* SOLR-12833: Avoid unnecessary memory cost when DistributedUpdateProcessor timed-out lock is not used.
(jefferyyuan, ab)
Other Changes
----------------------

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.solr.common.SolrException;
/** @lucene.internal */
/**
* This implementation uses lock and condition and will throw exception if it can't obtain the lock within
* <code>lockTimeoutMs</code>.
*/
public class TimedVersionBucket extends VersionBucket {
private final Lock lock = new ReentrantLock(true);
private final Condition condition = lock.newCondition();
/**
* This will run the function with the lock. It will throw exception if it can't obtain the lock within
* <code>lockTimeoutMs</code>.
*/
@Override
public <T,R> R runWithLock(int lockTimeoutMs, CheckedFunction<T,R> function) throws IOException {
if (tryLock(lockTimeoutMs)) {
return function.apply();
} else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unable to get version bucket lock in " + lockTimeoutMs + " ms");
}
}
public void unlock() {
lock.unlock();
}
public void signalAll() {
condition.signalAll();
}
public void awaitNanos(long nanosTimeout) {
try {
condition.awaitNanos(nanosTimeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
protected boolean tryLock(int lockTimeoutMs) {
try {
return lock.tryLock(lockTimeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}

View File

@ -16,26 +16,19 @@
*/
package org.apache.solr.update;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
// TODO: make inner?
// TODO: store the highest possible in the index on a commit (but how to not block adds?)
// TODO: could also store highest possible in the transaction log after a commit.
// Or on a new index, just scan "version" for the max?
/** @lucene.internal */
/**
* The default implementation which uses the intrinsic object monitor.
* It uses less memory but ignores the <code>lockTimeoutMs</code>.
*/
public class VersionBucket {
private int lockTimeoutMs;
public VersionBucket(int lockTimeoutMs) {
this.lockTimeoutMs = lockTimeoutMs;
}
private final Lock lock = new ReentrantLock(true);
private final Condition condition = lock.newCondition();
public long highest;
public void updateHighest(long val) {
@ -44,32 +37,34 @@ public class VersionBucket {
}
}
public int getLockTimeoutMs() {
return lockTimeoutMs;
@FunctionalInterface
public interface CheckedFunction<T, R> {
R apply() throws IOException;
}
public boolean tryLock() {
try {
return lock.tryLock(lockTimeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
/**
* This will run the function with the intrinsic object monitor.
*/
public <T, R> R runWithLock(int lockTimeoutMs, CheckedFunction<T, R> function) throws IOException {
synchronized (this) {
return function.apply();
}
}
/**
* Nothing to do for the intrinsic object monitor.
*/
public void unlock() {
lock.unlock();
}
public void signalAll() {
condition.signalAll();
notifyAll();
}
public void awaitNanos(long nanosTimeout) {
try {
condition.awaitNanos(nanosTimeout);
wait(TimeUnit.NANOSECONDS.toMillis(nanosTimeout));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

View File

@ -48,15 +48,9 @@ public class VersionInfo {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS = "bucketVersionLockTimeoutMs";
/**
* same as default client read timeout: 10 mins
*/
private static final int DEFAULT_VERSION_BUCKET_LOCK_TIMEOUT_MS = 600000;
private final UpdateLog ulog;
private final VersionBucket[] buckets;
private SchemaField versionField;
private SchemaField idField;
final ReadWriteLock lock = new ReentrantReadWriteLock(true);
private int versionBucketLockTimeoutMs;
@ -100,14 +94,21 @@ public class VersionInfo {
this.ulog = ulog;
IndexSchema schema = ulog.uhandler.core.getLatestSchema();
versionField = getAndCheckVersionField(schema);
idField = schema.getUniqueKeyField();
versionBucketLockTimeoutMs = ulog.uhandler.core.getSolrConfig().getInt("updateHandler/versionBucketLockTimeoutMs",
Integer.parseInt(System.getProperty(SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS, "" + DEFAULT_VERSION_BUCKET_LOCK_TIMEOUT_MS)));
Integer.parseInt(System.getProperty(SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS, "0")));
buckets = new VersionBucket[ BitUtil.nextHighestPowerOfTwo(nBuckets) ];
for (int i=0; i<buckets.length; i++) {
buckets[i] = new VersionBucket(versionBucketLockTimeoutMs);
if (versionBucketLockTimeoutMs > 0) {
buckets[i] = new TimedVersionBucket();
} else {
buckets[i] = new VersionBucket();
}
}
}
public int getVersionBucketLockTimeoutMs() {
return versionBucketLockTimeoutMs;
}
public void reload() {
}

View File

@ -124,7 +124,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private final AtomicUpdateDocumentMerger docMerger;
private final UpdateLog ulog;
private final VersionInfo vinfo;
@VisibleForTesting
VersionInfo vinfo;
private final boolean versionsStored;
private boolean returnVersions;
@ -331,166 +332,170 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
vinfo.lockForUpdate();
if (bucket.tryLock()) {
try {
bucket.signalAll();
// just in case anyone is waiting let them know that we have a new update
// we obtain the version when synchronized and then do the add so we can ensure that
// if version1 < version2 then version1 is actually added before version2.
try {
long finalVersionOnUpdate = versionOnUpdate;
return bucket.runWithLock(vinfo.getVersionBucketLockTimeoutMs(), () -> doVersionAdd(cmd, finalVersionOnUpdate, isReplayOrPeersync, leaderLogic, forwardedFromCollection, bucket));
} finally {
vinfo.unlockForUpdate();
}
}
// even if we don't store the version field, synchronizing on the bucket
// will enable us to know what version happened first, and thus enable
// realtime-get to work reliably.
// TODO: if versions aren't stored, do we need to set on the cmd anyway for some reason?
// there may be other reasons in the future for a version on the commands
private boolean doVersionAdd(AddUpdateCommand cmd, long versionOnUpdate, boolean isReplayOrPeersync,
boolean leaderLogic, boolean forwardedFromCollection, VersionBucket bucket) throws IOException {
try {
BytesRef idBytes = cmd.getIndexedId();
bucket.signalAll();
// just in case anyone is waiting let them know that we have a new update
// we obtain the version when synchronized and then do the add so we can ensure that
// if version1 < version2 then version1 is actually added before version2.
if (versionsStored) {
// even if we don't store the version field, synchronizing on the bucket
// will enable us to know what version happened first, and thus enable
// realtime-get to work reliably.
// TODO: if versions aren't stored, do we need to set on the cmd anyway for some reason?
// there may be other reasons in the future for a version on the commands
long bucketVersion = bucket.highest;
if (versionsStored) {
if (leaderLogic) {
long bucketVersion = bucket.highest;
if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) {
// forwarded from a collection but we are not buffering so strip original version and apply our own
// see SOLR-5308
log.info("Removing version field from doc: " + cmd.getPrintableId());
cmd.solrDoc.remove(CommonParams.VERSION_FIELD);
versionOnUpdate = 0;
}
if (leaderLogic) {
getUpdatedDocument(cmd, versionOnUpdate);
if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) {
// forwarded from a collection but we are not buffering so strip original version and apply our own
// see SOLR-5308
log.info("Removing version field from doc: " + cmd.getPrintableId());
cmd.solrDoc.remove(CommonParams.VERSION_FIELD);
versionOnUpdate = 0;
}
// leaders can also be in buffering state during "migrate" API call, see SOLR-5308
if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE
&& isReplayOrPeersync == false) {
// we're not in an active state, and this update isn't from a replay, so buffer it.
log.info("Leader logic applied but update log is buffering: " + cmd.getPrintableId());
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
ulog.add(cmd);
return true;
}
getUpdatedDocument(cmd, versionOnUpdate);
if (versionOnUpdate != 0) {
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
long foundVersion = lastVersion == null ? -1 : lastVersion;
if (versionOnUpdate == foundVersion || (versionOnUpdate < 0 && foundVersion < 0)
|| (versionOnUpdate == 1 && foundVersion > 0)) {
// we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd
// specified it must exist (versionOnUpdate==1) and it does.
} else {
throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId()
+ " expected=" + versionOnUpdate + " actual=" + foundVersion);
}
}
// leaders can also be in buffering state during "migrate" API call, see SOLR-5308
if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE
&& isReplayOrPeersync == false) {
// we're not in an active state, and this update isn't from a replay, so buffer it.
log.info("Leader logic applied but update log is buffering: " + cmd.getPrintableId());
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
ulog.add(cmd);
return true;
}
long version = vinfo.getNewClock();
cmd.setVersion(version);
cmd.getSolrInputDocument().setField(CommonParams.VERSION_FIELD, version);
bucket.updateHighest(version);
} else {
// The leader forwarded us this update.
cmd.setVersion(versionOnUpdate);
if (shouldBufferUpdate(cmd, isReplayOrPeersync, ulog.getState())) {
// we're not in an active state, and this update isn't from a replay, so buffer it.
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
ulog.add(cmd);
return true;
}
if (cmd.isInPlaceUpdate()) {
long prev = cmd.prevVersion;
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
if (lastVersion == null || Math.abs(lastVersion) < prev) {
// this was checked for (in waitForDependentUpdates()) before entering the synchronized block.
// So we shouldn't be here, unless what must've happened is:
// by the time synchronization block was entered, the prev update was deleted by DBQ. Since
// now that update is not in index, the vinfo.lookupVersion() is possibly giving us a version
// from the deleted list (which might be older than the prev update!)
UpdateCommand fetchedFromLeader = fetchFullUpdateFromLeader(cmd, versionOnUpdate);
if (fetchedFromLeader instanceof DeleteUpdateCommand) {
log.info("In-place update of {} failed to find valid lastVersion to apply to, and the document"
+ " was deleted at the leader subsequently.", idBytes.utf8ToString());
versionDelete((DeleteUpdateCommand) fetchedFromLeader);
return true;
} else {
assert fetchedFromLeader instanceof AddUpdateCommand;
// Newer document was fetched from the leader. Apply that document instead of this current in-place
// update.
log.info(
"In-place update of {} failed to find valid lastVersion to apply to, forced to fetch full doc from leader: {}",
idBytes.utf8ToString(), fetchedFromLeader);
// Make this update to become a non-inplace update containing the full document obtained from the
// leader
cmd.solrDoc = ((AddUpdateCommand) fetchedFromLeader).solrDoc;
cmd.prevVersion = -1;
cmd.setVersion((long) cmd.solrDoc.getFieldValue(CommonParams.VERSION_FIELD));
assert cmd.isInPlaceUpdate() == false;
}
} else {
if (lastVersion != null && Math.abs(lastVersion) > prev) {
// this means we got a newer full doc update and in that case it makes no sense to apply the older
// inplace update. Drop this update
log.info("Update was applied on version: " + prev + ", but last version I have is: " + lastVersion
+ ". Dropping current update.");
return true;
} else {
// We're good, we should apply this update. First, update the bucket's highest.
if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
bucket.updateHighest(versionOnUpdate);
}
}
}
if (versionOnUpdate != 0) {
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
long foundVersion = lastVersion == null ? -1 : lastVersion;
if (versionOnUpdate == foundVersion || (versionOnUpdate < 0 && foundVersion < 0)
|| (versionOnUpdate == 1 && foundVersion > 0)) {
// we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd
// specified it must exist (versionOnUpdate==1) and it does.
} else {
// if we aren't the leader, then we need to check that updates were not re-ordered
if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
// we're OK... this update has a version higher than anything we've seen
// in this bucket so far, so we know that no reordering has yet occurred.
bucket.updateHighest(versionOnUpdate);
} else {
// there have been updates higher than the current update. we need to check
// the specific version for this id.
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
// This update is a repeat, or was reordered. We need to drop this update.
log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
return true;
}
}
}
if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId()
+ " expected=" + versionOnUpdate + " actual=" + foundVersion);
}
}
long version = vinfo.getNewClock();
cmd.setVersion(version);
cmd.getSolrInputDocument().setField(CommonParams.VERSION_FIELD, version);
bucket.updateHighest(version);
} else {
// The leader forwarded us this update.
cmd.setVersion(versionOnUpdate);
if (shouldBufferUpdate(cmd, isReplayOrPeersync, ulog.getState())) {
// we're not in an active state, and this update isn't from a replay, so buffer it.
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
ulog.add(cmd);
return true;
}
if (cmd.isInPlaceUpdate()) {
long prev = cmd.prevVersion;
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
if (lastVersion == null || Math.abs(lastVersion) < prev) {
// this was checked for (in waitForDependentUpdates()) before entering the synchronized block.
// So we shouldn't be here, unless what must've happened is:
// by the time synchronization block was entered, the prev update was deleted by DBQ. Since
// now that update is not in index, the vinfo.lookupVersion() is possibly giving us a version
// from the deleted list (which might be older than the prev update!)
UpdateCommand fetchedFromLeader = fetchFullUpdateFromLeader(cmd, versionOnUpdate);
if (fetchedFromLeader instanceof DeleteUpdateCommand) {
log.info("In-place update of {} failed to find valid lastVersion to apply to, and the document"
+ " was deleted at the leader subsequently.", idBytes.utf8ToString());
versionDelete((DeleteUpdateCommand) fetchedFromLeader);
return true;
} else {
assert fetchedFromLeader instanceof AddUpdateCommand;
// Newer document was fetched from the leader. Apply that document instead of this current in-place
// update.
log.info(
"In-place update of {} failed to find valid lastVersion to apply to, forced to fetch full doc from leader: {}",
idBytes.utf8ToString(), fetchedFromLeader);
// Make this update to become a non-inplace update containing the full document obtained from the
// leader
cmd.solrDoc = ((AddUpdateCommand) fetchedFromLeader).solrDoc;
cmd.prevVersion = -1;
cmd.setVersion((long) cmd.solrDoc.getFieldValue(CommonParams.VERSION_FIELD));
assert cmd.isInPlaceUpdate() == false;
}
} else {
if (lastVersion != null && Math.abs(lastVersion) > prev) {
// this means we got a newer full doc update and in that case it makes no sense to apply the older
// inplace update. Drop this update
log.info("Update was applied on version: " + prev + ", but last version I have is: " + lastVersion
+ ". Dropping current update.");
return true;
} else {
// We're good, we should apply this update. First, update the bucket's highest.
if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
bucket.updateHighest(versionOnUpdate);
}
}
}
} else {
// if we aren't the leader, then we need to check that updates were not re-ordered
if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
// we're OK... this update has a version higher than anything we've seen
// in this bucket so far, so we know that no reordering has yet occurred.
bucket.updateHighest(versionOnUpdate);
} else {
// there have been updates higher than the current update. we need to check
// the specific version for this id.
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
// This update is a repeat, or was reordered. We need to drop this update.
log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
return true;
}
}
}
if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
}
}
SolrInputDocument clonedDoc = shouldCloneCmdDoc() ? cmd.solrDoc.deepCopy(): null;
// TODO: possibly set checkDeleteByQueries as a flag on the command?
doLocalAdd(cmd);
// if the update updates a doc that is part of a nested structure,
// force open a realTimeSearcher to trigger a ulog cache refresh.
// This refresh makes RTG handler aware of this update.q
if(req.getSchema().isUsableForChildDocs() && shouldRefreshUlogCaches(cmd)) {
ulog.openRealtimeSearcher();
}
if (clonedDoc != null) {
cmd.solrDoc = clonedDoc;
}
} finally {
bucket.unlock();
vinfo.unlockForUpdate();
}
return false;
} else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unable to get version bucket lock in " + bucket.getLockTimeoutMs() + " ms");
SolrInputDocument clonedDoc = shouldCloneCmdDoc() ? cmd.solrDoc.deepCopy(): null;
// TODO: possibly set checkDeleteByQueries as a flag on the command?
doLocalAdd(cmd);
// if the update updates a doc that is part of a nested structure,
// force open a realTimeSearcher to trigger a ulog cache refresh.
// This refresh makes RTG handler aware of this update.q
if(req.getSchema().isUsableForChildDocs() && shouldRefreshUlogCaches(cmd)) {
ulog.openRealtimeSearcher();
}
if (clonedDoc != null) {
cmd.solrDoc = clonedDoc;
}
} finally {
bucket.unlock();
}
return false;
}
/**
@ -527,31 +532,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
TimeOut waitTimeout = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
vinfo.lockForUpdate();
if (bucket.tryLock()) {
try {
Long lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion;
if (Math.abs(lastFoundVersion) < cmd.prevVersion) {
log.debug("Re-ordered inplace update. version={}, prevVersion={}, lastVersion={}, replayOrPeerSync={}, id={}",
(cmd.getVersion() == 0 ? versionOnUpdate : cmd.getVersion()), cmd.prevVersion, lastFoundVersion,
isReplayOrPeersync, cmd.getPrintableId());
}
while (Math.abs(lastFoundVersion) < cmd.prevVersion && !waitTimeout.hasTimedOut()) {
bucket.awaitNanos(waitTimeout.timeLeft(TimeUnit.NANOSECONDS));
lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion;
}
} finally {
bucket.unlock();
vinfo.unlockForUpdate();
}
} else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unable to get version bucket lock in " + bucket.getLockTimeoutMs() + " ms");
try {
lastFoundVersion = bucket.runWithLock(vinfo.getVersionBucketLockTimeoutMs(), () -> doWaitForDependentUpdates(cmd, versionOnUpdate, isReplayOrPeersync, bucket, waitTimeout));
} finally {
vinfo.unlockForUpdate();
}
if (Math.abs(lastFoundVersion) > cmd.prevVersion) {
@ -590,6 +574,33 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return missingUpdate.getVersion();
}
private long doWaitForDependentUpdates(AddUpdateCommand cmd, long versionOnUpdate, boolean isReplayOrPeersync, VersionBucket bucket,
TimeOut waitTimeout) {
long lastFoundVersion;
try {
Long lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion;
if (Math.abs(lastFoundVersion) < cmd.prevVersion) {
log.debug("Re-ordered inplace update. version={}, prevVersion={}, lastVersion={}, replayOrPeerSync={}, id={}",
(cmd.getVersion() == 0 ? versionOnUpdate : cmd.getVersion()), cmd.prevVersion, lastFoundVersion,
isReplayOrPeersync, cmd.getPrintableId());
}
while (Math.abs(lastFoundVersion) < cmd.prevVersion && !waitTimeout.hasTimedOut()) {
long timeLeftInNanos = waitTimeout.timeLeft(TimeUnit.NANOSECONDS);
if(timeLeftInNanos > 0) { // 0 means: wait forever until notified, but we don't want that.
bucket.awaitNanos(timeLeftInNanos);
}
lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion;
}
} finally {
bucket.unlock();
}
return lastFoundVersion;
}
/**
* This method is used when an update on which a particular in-place update has been lost for some reason. This method
* sends a request to the shard leader to fetch the latest full document as seen on the leader.
@ -938,87 +949,94 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
VersionBucket bucket = vinfo.bucket(bucketHash);
vinfo.lockForUpdate();
if (bucket.tryLock()) {
try {
if (versionsStored) {
long bucketVersion = bucket.highest;
try {
long finalVersionOnUpdate = versionOnUpdate;
return bucket.runWithLock(vinfo.getVersionBucketLockTimeoutMs(), () -> doVersionDelete(cmd, finalVersionOnUpdate, signedVersionOnUpdate, isReplayOrPeersync, leaderLogic,
forwardedFromCollection, bucket));
} finally {
vinfo.unlockForUpdate();
}
}
if (leaderLogic) {
private boolean doVersionDelete(DeleteUpdateCommand cmd, long versionOnUpdate, long signedVersionOnUpdate,
boolean isReplayOrPeersync, boolean leaderLogic, boolean forwardedFromCollection, VersionBucket bucket)
throws IOException {
try {
BytesRef idBytes = cmd.getIndexedId();
if (versionsStored) {
long bucketVersion = bucket.highest;
if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) {
// forwarded from a collection but we are not buffering so strip original version and apply our own
// see SOLR-5308
log.info("Removing version field from doc: " + cmd.getId());
versionOnUpdate = signedVersionOnUpdate = 0;
}
if (leaderLogic) {
// leaders can also be in buffering state during "migrate" API call, see SOLR-5308
if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE
&& !isReplayOrPeersync) {
// we're not in an active state, and this update isn't from a replay, so buffer it.
log.info("Leader logic applied but update log is buffering: " + cmd.getId());
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
ulog.delete(cmd);
return true;
}
if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) {
// forwarded from a collection but we are not buffering so strip original version and apply our own
// see SOLR-5308
log.info("Removing version field from doc: " + cmd.getId());
versionOnUpdate = signedVersionOnUpdate = 0;
}
if (signedVersionOnUpdate != 0) {
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
long foundVersion = lastVersion == null ? -1 : lastVersion;
if ((signedVersionOnUpdate == foundVersion) || (signedVersionOnUpdate < 0 && foundVersion < 0)
|| (signedVersionOnUpdate == 1 && foundVersion > 0)) {
// we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd
// specified it must exist (versionOnUpdate==1) and it does.
} else {
throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getId() + " expected="
+ signedVersionOnUpdate + " actual=" + foundVersion);
}
}
// leaders can also be in buffering state during "migrate" API call, see SOLR-5308
if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE
&& !isReplayOrPeersync) {
// we're not in an active state, and this update isn't from a replay, so buffer it.
log.info("Leader logic applied but update log is buffering: " + cmd.getId());
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
ulog.delete(cmd);
return true;
}
long version = vinfo.getNewClock();
cmd.setVersion(-version);
bucket.updateHighest(version);
} else {
cmd.setVersion(-versionOnUpdate);
if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) {
// we're not in an active state, and this update isn't from a replay, so buffer it.
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
ulog.delete(cmd);
return true;
}
// if we aren't the leader, then we need to check that updates were not re-ordered
if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
// we're OK... this update has a version higher than anything we've seen
// in this bucket so far, so we know that no reordering has yet occurred.
bucket.updateHighest(versionOnUpdate);
if (signedVersionOnUpdate != 0) {
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
long foundVersion = lastVersion == null ? -1 : lastVersion;
if ((signedVersionOnUpdate == foundVersion) || (signedVersionOnUpdate < 0 && foundVersion < 0)
|| (signedVersionOnUpdate == 1 && foundVersion > 0)) {
// we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd
// specified it must exist (versionOnUpdate==1) and it does.
} else {
// there have been updates higher than the current update. we need to check
// the specific version for this id.
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
// This update is a repeat, or was reordered. We need to drop this update.
log.debug("Dropping delete update due to version {}", idBytes.utf8ToString());
return true;
}
}
if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getId() + " expected="
+ signedVersionOnUpdate + " actual=" + foundVersion);
}
}
}
doLocalDelete(cmd);
return false;
} finally {
bucket.unlock();
vinfo.unlockForUpdate();
long version = vinfo.getNewClock();
cmd.setVersion(-version);
bucket.updateHighest(version);
} else {
cmd.setVersion(-versionOnUpdate);
if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) {
// we're not in an active state, and this update isn't from a replay, so buffer it.
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
ulog.delete(cmd);
return true;
}
// if we aren't the leader, then we need to check that updates were not re-ordered
if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
// we're OK... this update has a version higher than anything we've seen
// in this bucket so far, so we know that no reordering has yet occurred.
bucket.updateHighest(versionOnUpdate);
} else {
// there have been updates higher than the current update. we need to check
// the specific version for this id.
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
// This update is a repeat, or was reordered. We need to drop this update.
log.debug("Dropping delete update due to version {}", idBytes.utf8ToString());
return true;
}
}
if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
}
}
}
} else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unable to get version bucket lock in " + bucket.getLockTimeoutMs() + " ms");
doLocalDelete(cmd);
return false;
} finally {
bucket.unlock();
}
}

View File

@ -17,36 +17,173 @@
package org.apache.solr.update.processor;
import static org.hamcrest.CoreMatchers.is;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doReturn;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.TimedVersionBucket;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.VersionInfo;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
@Rule
public MockitoRule rule = MockitoJUnit.rule();
private static ExecutorService executor;
@BeforeClass
public static void beforeClass() throws Exception {
initCore("solr/collection1/conf/solrconfig.xml","solr/collection1/conf/schema-minimal.xml");
executor = Executors.newCachedThreadPool();
initCore("solr/collection1/conf/solrconfig.xml","solr/collection1/conf/schema-minimal-with-another-uniqkey.xml");
}
@AfterClass
public static void AfterClass() {
executor.shutdown();
}
@Test
public void testShouldBufferUpdateZk() {
public void testShouldBufferUpdateZk() throws IOException {
SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), new ModifiableSolrParams());
DistributedUpdateProcessor processor = new DistributedUpdateProcessor(
req, null, null, null);
AddUpdateCommand cmd = new AddUpdateCommand(req);
// applying buffer updates, isReplayOrPeerSync flag doesn't matter
assertFalse(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED));
assertFalse(processor.shouldBufferUpdate(cmd, true, UpdateLog.State.APPLYING_BUFFERED));
try (DistributedUpdateProcessor processor = new DistributedUpdateProcessor(
req, null, null, null)) {
AddUpdateCommand cmd = new AddUpdateCommand(req);
// applying buffer updates, isReplayOrPeerSync flag doesn't matter
assertFalse(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED));
assertFalse(processor.shouldBufferUpdate(cmd, true, UpdateLog.State.APPLYING_BUFFERED));
assertTrue(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.BUFFERING));
// this is not an buffer updates and it depend on other updates
cmd.prevVersion = 10;
assertTrue(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED));
}
}
@Test
public void testVersionAdd() throws IOException {
SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), new ModifiableSolrParams());
int threads = 5;
Function<DistributedUpdateProcessor,Boolean> versionAddFunc = (DistributedUpdateProcessor process) -> {
try {
AddUpdateCommand cmd = new AddUpdateCommand(req);
cmd.solrDoc = new SolrInputDocument();
cmd.solrDoc.setField("notid", "10");
return process.versionAdd(cmd);
} catch (IOException e) {
throw new RuntimeException(e);
}
};
int succeeded = runCommands(threads, 1000, req, versionAddFunc);
// only one should succeed
assertThat(succeeded, is(1));
assertTrue(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.BUFFERING));
// this is not an buffer updates and it depend on other updates
cmd.prevVersion = 10;
assertTrue(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED));
succeeded = runCommands(threads, -1, req, versionAddFunc);
// all should succeed
assertThat(succeeded, is(threads));
}
@Test
public void testVersionDelete() throws IOException {
SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), new ModifiableSolrParams());
int threads = 5;
Function<DistributedUpdateProcessor,Boolean> versionDeleteFunc = (DistributedUpdateProcessor process) -> {
try {
DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
cmd.id = "1";
return process.versionDelete(cmd);
} catch (IOException e) {
throw new RuntimeException(e);
}
};
int succeeded = runCommands(threads, 1000, req, versionDeleteFunc);
// only one should succeed
assertThat(succeeded, is(1));
succeeded = runCommands(threads, -1, req, versionDeleteFunc);
// all should succeed
assertThat(succeeded, is(threads));
}
/**
* @return how many requests succeeded
*/
private int runCommands(int threads, int versionBucketLockTimeoutMs, SolrQueryRequest req,
Function<DistributedUpdateProcessor,Boolean> function)
throws IOException {
try (DistributedUpdateProcessor processor = new DistributedUpdateProcessor(
req, null, null, null)) {
if (versionBucketLockTimeoutMs > 0) {
// use TimedVersionBucket with versionBucketLockTimeoutMs
VersionInfo vinfo = Mockito.spy(processor.vinfo);
processor.vinfo = vinfo;
doReturn(new TimedVersionBucket() {
/**
* simulate the case: it takes 5 seconds to add the doc
*
*/
@Override
protected boolean tryLock(int lockTimeoutMs) {
boolean locked = super.tryLock(versionBucketLockTimeoutMs);
if (locked) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return locked;
}
}).when(vinfo).bucket(anyInt());
}
CountDownLatch latch = new CountDownLatch(1);
Collection<Future<Boolean>> futures = new ArrayList<>();
for (int t = 0; t < threads; ++t) {
futures.add(executor.submit(() -> {
latch.await();
return function.apply(processor);
}));
}
latch.countDown();
int succeeded = 0;
for (Future<Boolean> f : futures) {
try {
f.get();
succeeded++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
// do nothing
}
}
return succeeded;
}
}
}

View File

@ -131,3 +131,27 @@ An example, to be included under `<config><updateHandler>` in `solrconfig.xml`,
<int name="numVersionBuckets">65536</int>
</updateLog>
----
== Other options
In some cases complex updates (such as spatial/shape) may take very long time to complete. In the default
configuration other updates that fall into the same internal version bucket will wait indefinitely and
eventually these outstanding requests may pile up and lead to thread exhaustion and eventually to
OutOfMemory errors.
The option `versionBucketLockTimeoutMs` in the `updateHandler` section helps to prevent that by
specifying a limited timeout for such extremely long running update requests. If this limit
is reached this update will fail but it won't block forever all other updates. See SOLR-12833 for more details.
There's a memory cost associated with this setting. Values greater than the default 0 (meaning unlimited timeout)
cause Solr to use a different internal implementation of the version bucket, which increases memory consumption
from ~1.5MB to ~6.8MB per Solr core.
An example of specifying this option under `<config>` section of `solrconfig.xml`:
[source,xml]
----
<updateHandler class="solr.DirectUpdateHandler2">
...
<int name="versionBucketLockTimeoutMs">10000</int>
</updateHandler>
----