mirror of https://github.com/apache/lucene.git
SOLR-12833: Add configurable timeout to VersionBucket lock.
This commit is contained in:
parent
b4e1fe4393
commit
b9a966e5f7
|
@ -163,6 +163,8 @@ Improvements
|
|||
|
||||
* SOLR-12804: Remove static modifier from Overseer queue access. (Mark Miller)
|
||||
|
||||
* SOLR-12833: Add configurable timeout to VersionBucket lock. (Jeffery Yuan, Mark Miller)
|
||||
|
||||
Other Changes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -16,12 +16,26 @@
|
|||
*/
|
||||
package org.apache.solr.update;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
// TODO: make inner?
|
||||
// TODO: store the highest possible in the index on a commit (but how to not block adds?)
|
||||
// TODO: could also store highest possible in the transaction log after a commit.
|
||||
// Or on a new index, just scan "version" for the max?
|
||||
/** @lucene.internal */
|
||||
public class VersionBucket {
|
||||
private int lockTimeoutMs;
|
||||
|
||||
public VersionBucket(int lockTimeoutMs) {
|
||||
this.lockTimeoutMs = lockTimeoutMs;
|
||||
}
|
||||
|
||||
private final Lock lock = new ReentrantLock(true);
|
||||
private final Condition condition = lock.newCondition();
|
||||
|
||||
public long highest;
|
||||
|
||||
public void updateHighest(long val) {
|
||||
|
@ -29,4 +43,34 @@ public class VersionBucket {
|
|||
highest = Math.max(highest, Math.abs(val));
|
||||
}
|
||||
}
|
||||
|
||||
public int getLockTimeoutMs() {
|
||||
return lockTimeoutMs;
|
||||
}
|
||||
|
||||
public boolean tryLock() {
|
||||
try {
|
||||
return lock.tryLock(lockTimeoutMs, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void unlock() {
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
public void signalAll() {
|
||||
condition.signalAll();
|
||||
}
|
||||
|
||||
public void awaitNanos(long nanosTimeout) {
|
||||
try {
|
||||
condition.awaitNanos(nanosTimeout);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,8 +45,13 @@ import org.slf4j.LoggerFactory;
|
|||
import static org.apache.solr.common.params.CommonParams.VERSION_FIELD;
|
||||
|
||||
public class VersionInfo {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private static final String SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS = "bucketVersionLockTimeoutMs";
|
||||
|
||||
/**
|
||||
* same as default client read timeout: 10 mins
|
||||
*/
|
||||
private static final int DEFAULT_VERSION_BUCKET_LOCK_TIMEOUT_MS = 600000;
|
||||
|
||||
private final UpdateLog ulog;
|
||||
private final VersionBucket[] buckets;
|
||||
|
@ -54,6 +59,8 @@ public class VersionInfo {
|
|||
private SchemaField idField;
|
||||
final ReadWriteLock lock = new ReentrantReadWriteLock(true);
|
||||
|
||||
private int versionBucketLockTimeoutMs;
|
||||
|
||||
/**
|
||||
* Gets and returns the {@link org.apache.solr.common.params.CommonParams#VERSION_FIELD} from the specified
|
||||
* schema, after verifying that it is indexed, stored, and single-valued.
|
||||
|
@ -94,9 +101,11 @@ public class VersionInfo {
|
|||
IndexSchema schema = ulog.uhandler.core.getLatestSchema();
|
||||
versionField = getAndCheckVersionField(schema);
|
||||
idField = schema.getUniqueKeyField();
|
||||
versionBucketLockTimeoutMs = ulog.uhandler.core.getSolrConfig().getInt("updateHandler/versionBucketLockTimeoutMs",
|
||||
Integer.parseInt(System.getProperty(SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS, "" + DEFAULT_VERSION_BUCKET_LOCK_TIMEOUT_MS)));
|
||||
buckets = new VersionBucket[ BitUtil.nextHighestPowerOfTwo(nBuckets) ];
|
||||
for (int i=0; i<buckets.length; i++) {
|
||||
buckets[i] = new VersionBucket();
|
||||
buckets[i] = new VersionBucket(versionBucketLockTimeoutMs);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1010,16 +1010,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
|
||||
if (vinfo == null) {
|
||||
if (AtomicUpdateDocumentMerger.isAtomicUpdate(cmd)) {
|
||||
throw new SolrException
|
||||
(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Atomic document updates are not supported unless <updateLog/> is configured");
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Atomic document updates are not supported unless <updateLog/> is configured");
|
||||
} else {
|
||||
super.processAdd(cmd);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash here)
|
||||
// This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash
|
||||
// here)
|
||||
int bucketHash = bucketHash(idBytes);
|
||||
|
||||
// at this point, there is an update we need to try and apply.
|
||||
|
@ -1059,9 +1059,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
}
|
||||
|
||||
vinfo.lockForUpdate();
|
||||
try {
|
||||
synchronized (bucket) {
|
||||
bucket.notifyAll(); //just in case anyone is waiting let them know that we have a new update
|
||||
if (bucket.tryLock()) {
|
||||
try {
|
||||
bucket.signalAll();
|
||||
// just in case anyone is waiting let them know that we have a new update
|
||||
// we obtain the version when synchronized and then do the add so we can ensure that
|
||||
// if version1 < version2 then version1 is actually added before version2.
|
||||
|
||||
|
@ -1100,15 +1101,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
if (versionOnUpdate != 0) {
|
||||
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
|
||||
long foundVersion = lastVersion == null ? -1 : lastVersion;
|
||||
if ( versionOnUpdate == foundVersion || (versionOnUpdate < 0 && foundVersion < 0) || (versionOnUpdate==1 && foundVersion > 0) ) {
|
||||
if (versionOnUpdate == foundVersion || (versionOnUpdate < 0 && foundVersion < 0)
|
||||
|| (versionOnUpdate == 1 && foundVersion > 0)) {
|
||||
// we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd
|
||||
// specified it must exist (versionOnUpdate==1) and it does.
|
||||
} else {
|
||||
throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId() + " expected=" + versionOnUpdate + " actual=" + foundVersion);
|
||||
throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId()
|
||||
+ " expected=" + versionOnUpdate + " actual=" + foundVersion);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
long version = vinfo.getNewClock();
|
||||
cmd.setVersion(version);
|
||||
cmd.getSolrInputDocument().setField(CommonParams.VERSION_FIELD, version);
|
||||
|
@ -1131,25 +1133,28 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
// this was checked for (in waitForDependentUpdates()) before entering the synchronized block.
|
||||
// So we shouldn't be here, unless what must've happened is:
|
||||
// by the time synchronization block was entered, the prev update was deleted by DBQ. Since
|
||||
// now that update is not in index, the vinfo.lookupVersion() is possibly giving us a version
|
||||
// from the deleted list (which might be older than the prev update!)
|
||||
// now that update is not in index, the vinfo.lookupVersion() is possibly giving us a version
|
||||
// from the deleted list (which might be older than the prev update!)
|
||||
UpdateCommand fetchedFromLeader = fetchFullUpdateFromLeader(cmd, versionOnUpdate);
|
||||
|
||||
if (fetchedFromLeader instanceof DeleteUpdateCommand) {
|
||||
log.info("In-place update of {} failed to find valid lastVersion to apply to, and the document"
|
||||
+ " was deleted at the leader subsequently.", idBytes.utf8ToString());
|
||||
versionDelete((DeleteUpdateCommand)fetchedFromLeader);
|
||||
versionDelete((DeleteUpdateCommand) fetchedFromLeader);
|
||||
return true;
|
||||
} else {
|
||||
assert fetchedFromLeader instanceof AddUpdateCommand;
|
||||
// Newer document was fetched from the leader. Apply that document instead of this current in-place update.
|
||||
log.info("In-place update of {} failed to find valid lastVersion to apply to, forced to fetch full doc from leader: {}",
|
||||
// Newer document was fetched from the leader. Apply that document instead of this current in-place
|
||||
// update.
|
||||
log.info(
|
||||
"In-place update of {} failed to find valid lastVersion to apply to, forced to fetch full doc from leader: {}",
|
||||
idBytes.utf8ToString(), fetchedFromLeader);
|
||||
|
||||
// Make this update to become a non-inplace update containing the full document obtained from the leader
|
||||
cmd.solrDoc = ((AddUpdateCommand)fetchedFromLeader).solrDoc;
|
||||
// Make this update to become a non-inplace update containing the full document obtained from the
|
||||
// leader
|
||||
cmd.solrDoc = ((AddUpdateCommand) fetchedFromLeader).solrDoc;
|
||||
cmd.prevVersion = -1;
|
||||
cmd.setVersion((long)cmd.solrDoc.getFieldValue(CommonParams.VERSION_FIELD));
|
||||
cmd.setVersion((long) cmd.solrDoc.getFieldValue(CommonParams.VERSION_FIELD));
|
||||
assert cmd.isInPlaceUpdate() == false;
|
||||
}
|
||||
} else {
|
||||
|
@ -1173,11 +1178,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
// in this bucket so far, so we know that no reordering has yet occurred.
|
||||
bucket.updateHighest(versionOnUpdate);
|
||||
} else {
|
||||
// there have been updates higher than the current update. we need to check
|
||||
// there have been updates higher than the current update. we need to check
|
||||
// the specific version for this id.
|
||||
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
|
||||
if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
|
||||
// This update is a repeat, or was reordered. We need to drop this update.
|
||||
// This update is a repeat, or was reordered. We need to drop this update.
|
||||
log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
|
||||
return true;
|
||||
}
|
||||
|
@ -1188,9 +1193,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
boolean willDistrib = isLeader && nodes != null && nodes.size() > 0;
|
||||
|
||||
|
||||
SolrInputDocument clonedDoc = null;
|
||||
if (willDistrib && cloneRequiredOnLeader) {
|
||||
clonedDoc = cmd.solrDoc.deepCopy();
|
||||
|
@ -1198,16 +1203,22 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
|
||||
// TODO: possibly set checkDeleteByQueries as a flag on the command?
|
||||
doLocalAdd(cmd);
|
||||
|
||||
|
||||
if (willDistrib && cloneRequiredOnLeader) {
|
||||
cmd.solrDoc = clonedDoc;
|
||||
}
|
||||
} finally {
|
||||
|
||||
} // end synchronized (bucket)
|
||||
} finally {
|
||||
vinfo.unlockForUpdate();
|
||||
bucket.unlock();
|
||||
|
||||
vinfo.unlockForUpdate();
|
||||
}
|
||||
return false;
|
||||
|
||||
} else {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Unable to get version bucket lock in " + bucket.getLockTimeoutMs() + " ms");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -1236,31 +1247,31 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
TimeOut waitTimeout = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||
|
||||
vinfo.lockForUpdate();
|
||||
try {
|
||||
synchronized (bucket) {
|
||||
if (bucket.tryLock()) {
|
||||
try {
|
||||
Long lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
|
||||
lastFoundVersion = lookedUpVersion == null ? 0L: lookedUpVersion;
|
||||
lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion;
|
||||
|
||||
if (Math.abs(lastFoundVersion) < cmd.prevVersion) {
|
||||
log.debug("Re-ordered inplace update. version={}, prevVersion={}, lastVersion={}, replayOrPeerSync={}, id={}",
|
||||
(cmd.getVersion() == 0 ? versionOnUpdate : cmd.getVersion()), cmd.prevVersion, lastFoundVersion, isReplayOrPeersync, cmd.getPrintableId());
|
||||
log.debug("Re-ordered inplace update. version={}, prevVersion={}, lastVersion={}, replayOrPeerSync={}, id={}",
|
||||
(cmd.getVersion() == 0 ? versionOnUpdate : cmd.getVersion()), cmd.prevVersion, lastFoundVersion,
|
||||
isReplayOrPeersync, cmd.getPrintableId());
|
||||
}
|
||||
|
||||
while (Math.abs(lastFoundVersion) < cmd.prevVersion && !waitTimeout.hasTimedOut()) {
|
||||
try {
|
||||
long timeLeft = waitTimeout.timeLeft(TimeUnit.MILLISECONDS);
|
||||
if (timeLeft > 0) { // wait(0) waits forever until notified, but we don't want that.
|
||||
bucket.wait(timeLeft);
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
throw new RuntimeException(ie);
|
||||
}
|
||||
while (Math.abs(lastFoundVersion) < cmd.prevVersion && !waitTimeout.hasTimedOut()) {
|
||||
bucket.awaitNanos(waitTimeout.timeLeft(TimeUnit.NANOSECONDS));
|
||||
lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
|
||||
lastFoundVersion = lookedUpVersion == null ? 0L: lookedUpVersion;
|
||||
lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion;
|
||||
}
|
||||
} finally {
|
||||
|
||||
bucket.unlock();
|
||||
|
||||
vinfo.unlockForUpdate();
|
||||
}
|
||||
} finally {
|
||||
vinfo.unlockForUpdate();
|
||||
} else {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Unable to get version bucket lock in " + bucket.getLockTimeoutMs() + " ms");
|
||||
}
|
||||
|
||||
if (Math.abs(lastFoundVersion) > cmd.prevVersion) {
|
||||
|
@ -1793,7 +1804,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
return false;
|
||||
}
|
||||
|
||||
// This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash here)
|
||||
// This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash
|
||||
// here)
|
||||
int bucketHash = bucketHash(idBytes);
|
||||
|
||||
// at this point, there is an update we need to try and apply.
|
||||
|
@ -1806,22 +1818,21 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS);
|
||||
}
|
||||
long signedVersionOnUpdate = versionOnUpdate;
|
||||
versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version
|
||||
versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version
|
||||
|
||||
boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0;
|
||||
boolean leaderLogic = isLeader && !isReplayOrPeersync;
|
||||
boolean forwardedFromCollection = cmd.getReq().getParams().get(DISTRIB_FROM_COLLECTION) != null;
|
||||
|
||||
if (!leaderLogic && versionOnUpdate==0) {
|
||||
if (!leaderLogic && versionOnUpdate == 0) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
|
||||
}
|
||||
|
||||
VersionBucket bucket = vinfo.bucket(bucketHash);
|
||||
|
||||
vinfo.lockForUpdate();
|
||||
try {
|
||||
|
||||
synchronized (bucket) {
|
||||
if (bucket.tryLock()) {
|
||||
try {
|
||||
if (versionsStored) {
|
||||
long bucketVersion = bucket.highest;
|
||||
|
||||
|
@ -1847,11 +1858,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
if (signedVersionOnUpdate != 0) {
|
||||
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
|
||||
long foundVersion = lastVersion == null ? -1 : lastVersion;
|
||||
if ( (signedVersionOnUpdate == foundVersion) || (signedVersionOnUpdate < 0 && foundVersion < 0) || (signedVersionOnUpdate == 1 && foundVersion > 0) ) {
|
||||
if ((signedVersionOnUpdate == foundVersion) || (signedVersionOnUpdate < 0 && foundVersion < 0)
|
||||
|| (signedVersionOnUpdate == 1 && foundVersion > 0)) {
|
||||
// we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd
|
||||
// specified it must exist (versionOnUpdate==1) and it does.
|
||||
} else {
|
||||
throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getId() + " expected=" + signedVersionOnUpdate + " actual=" + foundVersion);
|
||||
throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getId() + " expected="
|
||||
+ signedVersionOnUpdate + " actual=" + foundVersion);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1874,11 +1887,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
// in this bucket so far, so we know that no reordering has yet occured.
|
||||
bucket.updateHighest(versionOnUpdate);
|
||||
} else {
|
||||
// there have been updates higher than the current update. we need to check
|
||||
// there have been updates higher than the current update. we need to check
|
||||
// the specific version for this id.
|
||||
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
|
||||
if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
|
||||
// This update is a repeat, or was reordered. We need to drop this update.
|
||||
// This update is a repeat, or was reordered. We need to drop this update.
|
||||
log.debug("Dropping delete update due to version {}", idBytes.utf8ToString());
|
||||
return true;
|
||||
}
|
||||
|
@ -1892,10 +1905,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
|
||||
doLocalDelete(cmd);
|
||||
return false;
|
||||
} // end synchronized (bucket)
|
||||
|
||||
} finally {
|
||||
vinfo.unlockForUpdate();
|
||||
} finally {
|
||||
bucket.unlock();
|
||||
vinfo.unlockForUpdate();
|
||||
}
|
||||
} else {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Unable to get version bucket lock in " + bucket.getLockTimeoutMs() + " ms");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue