diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 180b94de809..6546f39882e 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -166,6 +166,9 @@ Bug Fixes * SOLR-14189: Switch from String.trim() to StringUtils.isBlank() in query parsers (Andy Webb via Uwe Schindler) +* SOLR-13897: Fix unsafe publication of Terms object in ZkShardTerms that can cause visibility issues + and race conditions under contention. (shalin) + Other Changes --------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java index 629f2bcd13c..671bb469e6e 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java @@ -47,6 +47,12 @@ class ZkCollectionTerms implements AutoCloseable { } } + public void register(String shardId, String coreNodeName) { + synchronized (terms) { + getShard(shardId).registerTerm(coreNodeName); + } + } + public void remove(String shardId, CoreDescriptor coreDescriptor) { synchronized (terms) { if (getShard(shardId).removeTerm(coreDescriptor)) { diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 9d7d9be6f73..6db0c5ae97d 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -1204,12 +1204,13 @@ public class ZkController implements Closeable { throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, replica is removed from clusterstate"); } - ZkShardTerms shardTerms = getShardTerms(collection, cloudDesc.getShardId()); if (replica.getType() != Type.PULL) { - shardTerms.registerTerm(coreZkNodeName); + getCollectionTerms(collection).register(cloudDesc.getShardId(), coreZkNodeName); } + ZkShardTerms shardTerms = getShardTerms(collection, cloudDesc.getShardId()); + log.debug("Register replica - core:{} address:{} collection:{} shard:{}", coreName, baseUrl, collection, shardId); diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java index 0cab1a8231f..de1667015d3 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.solr.client.solrj.cloud.ShardTerms; import org.apache.solr.common.SolrException; @@ -65,7 +66,6 @@ public class ZkShardTerms implements AutoCloseable{ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final Object writingLock = new Object(); private final String collection; private final String shard; private final String znodePath; @@ -73,11 +73,24 @@ public class ZkShardTerms implements AutoCloseable{ private final Set listeners = new HashSet<>(); private final AtomicBoolean isClosed = new AtomicBoolean(false); - private ShardTerms terms; + private AtomicReference terms = new AtomicReference<>(); - // Listener of a core for shard's term change events + /** + * Listener of a core for shard's term change events + */ interface CoreTermWatcher { - // return true if the listener wanna to be triggered in the next time + /** + * Invoked with a Terms instance after update.

+ * Concurrent invocations of this method is not allowed so at a given time only one thread + * will invoke this method. + *

+ * Note - there is no guarantee that the terms version will be strictly monotonic i.e. + * an invocation with a newer terms version can be followed by an invocation with an older + * terms version. Implementations are required to be resilient to out-of-order invocations. + * + * @param terms instance + * @return true if the listener wanna to be triggered in the next time + */ boolean onTermChanged(ShardTerms terms); } @@ -101,13 +114,13 @@ public class ZkShardTerms implements AutoCloseable{ if (replicasNeedingRecovery.isEmpty()) return; ShardTerms newTerms; - while( (newTerms = terms.increaseTerms(leader, replicasNeedingRecovery)) != null) { + while( (newTerms = terms.get().increaseTerms(leader, replicasNeedingRecovery)) != null) { if (forceSaveTerms(newTerms)) return; } } public ShardTerms getShardTerms() { - return terms; + return terms.get(); } /** * Can this replica become leader? @@ -115,7 +128,7 @@ public class ZkShardTerms implements AutoCloseable{ * @return true if this replica can become leader, false if otherwise */ public boolean canBecomeLeader(String coreNodeName) { - return terms.canBecomeLeader(coreNodeName); + return terms.get().canBecomeLeader(coreNodeName); } /** @@ -124,7 +137,7 @@ public class ZkShardTerms implements AutoCloseable{ * @return true if this replica has term equals to leader's term, false if otherwise */ public boolean skipSendingUpdatesTo(String coreNodeName) { - return !terms.haveHighestTermValue(coreNodeName); + return !terms.get().haveHighestTermValue(coreNodeName); } /** @@ -133,7 +146,7 @@ public class ZkShardTerms implements AutoCloseable{ * @return true if this replica registered its term, false if otherwise */ public boolean registered(String coreNodeName) { - return terms.getTerm(coreNodeName) != null; + return terms.get().getTerm(coreNodeName) != null; } public void close() { @@ -147,9 +160,7 @@ public class ZkShardTerms implements AutoCloseable{ // package private for testing, only used by tests Map getTerms() { - synchronized (writingLock) { - return terms.getTerms(); - } + return new HashMap<>(terms.get().getTerms()); } /** @@ -169,7 +180,7 @@ public class ZkShardTerms implements AutoCloseable{ int numListeners; synchronized (listeners) { // solrcore already closed - listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(terms)); + listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(terms.get())); numListeners = listeners.size(); } return removeTerm(cd.getCloudDescriptor().getCoreNodeName()) || numListeners == 0; @@ -179,7 +190,7 @@ public class ZkShardTerms implements AutoCloseable{ // return true if this object should not be reused boolean removeTerm(String coreNodeName) { ShardTerms newTerms; - while ( (newTerms = terms.removeTerm(coreNodeName)) != null) { + while ( (newTerms = terms.get().removeTerm(coreNodeName)) != null) { try { if (saveTerms(newTerms)) return false; } catch (KeeperException.NoNodeException e) { @@ -196,7 +207,7 @@ public class ZkShardTerms implements AutoCloseable{ */ void registerTerm(String coreNodeName) { ShardTerms newTerms; - while ( (newTerms = terms.registerTerm(coreNodeName)) != null) { + while ( (newTerms = terms.get().registerTerm(coreNodeName)) != null) { if (forceSaveTerms(newTerms)) break; } } @@ -208,14 +219,14 @@ public class ZkShardTerms implements AutoCloseable{ */ public void setTermEqualsToLeader(String coreNodeName) { ShardTerms newTerms; - while ( (newTerms = terms.setTermEqualsToLeader(coreNodeName)) != null) { + while ( (newTerms = terms.get().setTermEqualsToLeader(coreNodeName)) != null) { if (forceSaveTerms(newTerms)) break; } } public void setTermToZero(String coreNodeName) { ShardTerms newTerms; - while ( (newTerms = terms.setTermToZero(coreNodeName)) != null) { + while ( (newTerms = terms.get().setTermToZero(coreNodeName)) != null) { if (forceSaveTerms(newTerms)) break; } } @@ -225,7 +236,7 @@ public class ZkShardTerms implements AutoCloseable{ */ public void startRecovering(String coreNodeName) { ShardTerms newTerms; - while ( (newTerms = terms.startRecovering(coreNodeName)) != null) { + while ( (newTerms = terms.get().startRecovering(coreNodeName)) != null) { if (forceSaveTerms(newTerms)) break; } } @@ -235,13 +246,13 @@ public class ZkShardTerms implements AutoCloseable{ */ public void doneRecovering(String coreNodeName) { ShardTerms newTerms; - while ( (newTerms = terms.doneRecovering(coreNodeName)) != null) { + while ( (newTerms = terms.get().doneRecovering(coreNodeName)) != null) { if (forceSaveTerms(newTerms)) break; } } public boolean isRecovering(String name) { - return terms.isRecovering(name); + return terms.get().isRecovering(name); } /** @@ -250,17 +261,17 @@ public class ZkShardTerms implements AutoCloseable{ */ public void ensureHighestTermsAreNotZero() { ShardTerms newTerms; - while ( (newTerms = terms.ensureHighestTermsAreNotZero()) != null) { + while ( (newTerms = terms.get().ensureHighestTermsAreNotZero()) != null) { if (forceSaveTerms(newTerms)) break; } } public long getHighestTerm() { - return terms.getMaxTerm(); + return terms.get().getMaxTerm(); } public long getTerm(String coreNodeName) { - Long term = terms.getTerm(coreNodeName); + Long term = terms.get().getTerm(coreNodeName); return term == null? -1 : term; } @@ -408,12 +419,18 @@ public class ZkShardTerms implements AutoCloseable{ */ private void setNewTerms(ShardTerms newTerms) { boolean isChanged = false; - synchronized (writingLock) { - if (terms == null || newTerms.getVersion() > terms.getVersion()) { - terms = newTerms; - isChanged = true; + for (;;) { + ShardTerms terms = this.terms.get(); + if (terms == null || newTerms.getVersion() > terms.getVersion()) { + if (this.terms.compareAndSet(terms, newTerms)) { + isChanged = true; + break; + } + } else { + break; } } + if (isChanged) onTermUpdates(newTerms); }