SOLR-13897: Fix unsafe publication of Terms object in ZkShardTerms that can cause visibility issues and race conditions under contention

(cherry picked from commit 776631254f)
This commit is contained in:
Shalin Shekhar Mangar 2020-01-27 12:08:20 +05:30
parent e934c8a7ca
commit 7316391d2d
4 changed files with 56 additions and 29 deletions

View File

@ -166,6 +166,9 @@ Bug Fixes
* SOLR-14189: Switch from String.trim() to StringUtils.isBlank() in query parsers (Andy Webb via Uwe Schindler) * SOLR-14189: Switch from String.trim() to StringUtils.isBlank() in query parsers (Andy Webb via Uwe Schindler)
* SOLR-13897: Fix unsafe publication of Terms object in ZkShardTerms that can cause visibility issues
and race conditions under contention. (shalin)
Other Changes Other Changes
--------------------- ---------------------

View File

@ -47,6 +47,12 @@ class ZkCollectionTerms implements AutoCloseable {
} }
} }
public void register(String shardId, String coreNodeName) {
synchronized (terms) {
getShard(shardId).registerTerm(coreNodeName);
}
}
public void remove(String shardId, CoreDescriptor coreDescriptor) { public void remove(String shardId, CoreDescriptor coreDescriptor) {
synchronized (terms) { synchronized (terms) {
if (getShard(shardId).removeTerm(coreDescriptor)) { if (getShard(shardId).removeTerm(coreDescriptor)) {

View File

@ -1204,12 +1204,13 @@ public class ZkController implements Closeable {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, replica is removed from clusterstate"); throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, replica is removed from clusterstate");
} }
ZkShardTerms shardTerms = getShardTerms(collection, cloudDesc.getShardId());
if (replica.getType() != Type.PULL) { if (replica.getType() != Type.PULL) {
shardTerms.registerTerm(coreZkNodeName); getCollectionTerms(collection).register(cloudDesc.getShardId(), coreZkNodeName);
} }
ZkShardTerms shardTerms = getShardTerms(collection, cloudDesc.getShardId());
log.debug("Register replica - core:{} address:{} collection:{} shard:{}", log.debug("Register replica - core:{} address:{} collection:{} shard:{}",
coreName, baseUrl, collection, shardId); coreName, baseUrl, collection, shardId);

View File

@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.cloud.ShardTerms; import org.apache.solr.client.solrj.cloud.ShardTerms;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
@ -65,7 +66,6 @@ public class ZkShardTerms implements AutoCloseable{
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Object writingLock = new Object();
private final String collection; private final String collection;
private final String shard; private final String shard;
private final String znodePath; private final String znodePath;
@ -73,11 +73,24 @@ public class ZkShardTerms implements AutoCloseable{
private final Set<CoreTermWatcher> listeners = new HashSet<>(); private final Set<CoreTermWatcher> listeners = new HashSet<>();
private final AtomicBoolean isClosed = new AtomicBoolean(false); private final AtomicBoolean isClosed = new AtomicBoolean(false);
private ShardTerms terms; private AtomicReference<ShardTerms> terms = new AtomicReference<>();
// Listener of a core for shard's term change events /**
* Listener of a core for shard's term change events
*/
interface CoreTermWatcher { interface CoreTermWatcher {
// return true if the listener wanna to be triggered in the next time /**
* Invoked with a Terms instance after update. <p/>
* Concurrent invocations of this method is not allowed so at a given time only one thread
* will invoke this method.
* <p/>
* <b>Note</b> - there is no guarantee that the terms version will be strictly monotonic i.e.
* an invocation with a newer terms version <i>can</i> be followed by an invocation with an older
* terms version. Implementations are required to be resilient to out-of-order invocations.
*
* @param terms instance
* @return true if the listener wanna to be triggered in the next time
*/
boolean onTermChanged(ShardTerms terms); boolean onTermChanged(ShardTerms terms);
} }
@ -101,13 +114,13 @@ public class ZkShardTerms implements AutoCloseable{
if (replicasNeedingRecovery.isEmpty()) return; if (replicasNeedingRecovery.isEmpty()) return;
ShardTerms newTerms; ShardTerms newTerms;
while( (newTerms = terms.increaseTerms(leader, replicasNeedingRecovery)) != null) { while( (newTerms = terms.get().increaseTerms(leader, replicasNeedingRecovery)) != null) {
if (forceSaveTerms(newTerms)) return; if (forceSaveTerms(newTerms)) return;
} }
} }
public ShardTerms getShardTerms() { public ShardTerms getShardTerms() {
return terms; return terms.get();
} }
/** /**
* Can this replica become leader? * Can this replica become leader?
@ -115,7 +128,7 @@ public class ZkShardTerms implements AutoCloseable{
* @return true if this replica can become leader, false if otherwise * @return true if this replica can become leader, false if otherwise
*/ */
public boolean canBecomeLeader(String coreNodeName) { public boolean canBecomeLeader(String coreNodeName) {
return terms.canBecomeLeader(coreNodeName); return terms.get().canBecomeLeader(coreNodeName);
} }
/** /**
@ -124,7 +137,7 @@ public class ZkShardTerms implements AutoCloseable{
* @return true if this replica has term equals to leader's term, false if otherwise * @return true if this replica has term equals to leader's term, false if otherwise
*/ */
public boolean skipSendingUpdatesTo(String coreNodeName) { public boolean skipSendingUpdatesTo(String coreNodeName) {
return !terms.haveHighestTermValue(coreNodeName); return !terms.get().haveHighestTermValue(coreNodeName);
} }
/** /**
@ -133,7 +146,7 @@ public class ZkShardTerms implements AutoCloseable{
* @return true if this replica registered its term, false if otherwise * @return true if this replica registered its term, false if otherwise
*/ */
public boolean registered(String coreNodeName) { public boolean registered(String coreNodeName) {
return terms.getTerm(coreNodeName) != null; return terms.get().getTerm(coreNodeName) != null;
} }
public void close() { public void close() {
@ -147,9 +160,7 @@ public class ZkShardTerms implements AutoCloseable{
// package private for testing, only used by tests // package private for testing, only used by tests
Map<String, Long> getTerms() { Map<String, Long> getTerms() {
synchronized (writingLock) { return new HashMap<>(terms.get().getTerms());
return terms.getTerms();
}
} }
/** /**
@ -169,7 +180,7 @@ public class ZkShardTerms implements AutoCloseable{
int numListeners; int numListeners;
synchronized (listeners) { synchronized (listeners) {
// solrcore already closed // solrcore already closed
listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(terms)); listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(terms.get()));
numListeners = listeners.size(); numListeners = listeners.size();
} }
return removeTerm(cd.getCloudDescriptor().getCoreNodeName()) || numListeners == 0; return removeTerm(cd.getCloudDescriptor().getCoreNodeName()) || numListeners == 0;
@ -179,7 +190,7 @@ public class ZkShardTerms implements AutoCloseable{
// return true if this object should not be reused // return true if this object should not be reused
boolean removeTerm(String coreNodeName) { boolean removeTerm(String coreNodeName) {
ShardTerms newTerms; ShardTerms newTerms;
while ( (newTerms = terms.removeTerm(coreNodeName)) != null) { while ( (newTerms = terms.get().removeTerm(coreNodeName)) != null) {
try { try {
if (saveTerms(newTerms)) return false; if (saveTerms(newTerms)) return false;
} catch (KeeperException.NoNodeException e) { } catch (KeeperException.NoNodeException e) {
@ -196,7 +207,7 @@ public class ZkShardTerms implements AutoCloseable{
*/ */
void registerTerm(String coreNodeName) { void registerTerm(String coreNodeName) {
ShardTerms newTerms; ShardTerms newTerms;
while ( (newTerms = terms.registerTerm(coreNodeName)) != null) { while ( (newTerms = terms.get().registerTerm(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break; if (forceSaveTerms(newTerms)) break;
} }
} }
@ -208,14 +219,14 @@ public class ZkShardTerms implements AutoCloseable{
*/ */
public void setTermEqualsToLeader(String coreNodeName) { public void setTermEqualsToLeader(String coreNodeName) {
ShardTerms newTerms; ShardTerms newTerms;
while ( (newTerms = terms.setTermEqualsToLeader(coreNodeName)) != null) { while ( (newTerms = terms.get().setTermEqualsToLeader(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break; if (forceSaveTerms(newTerms)) break;
} }
} }
public void setTermToZero(String coreNodeName) { public void setTermToZero(String coreNodeName) {
ShardTerms newTerms; ShardTerms newTerms;
while ( (newTerms = terms.setTermToZero(coreNodeName)) != null) { while ( (newTerms = terms.get().setTermToZero(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break; if (forceSaveTerms(newTerms)) break;
} }
} }
@ -225,7 +236,7 @@ public class ZkShardTerms implements AutoCloseable{
*/ */
public void startRecovering(String coreNodeName) { public void startRecovering(String coreNodeName) {
ShardTerms newTerms; ShardTerms newTerms;
while ( (newTerms = terms.startRecovering(coreNodeName)) != null) { while ( (newTerms = terms.get().startRecovering(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break; if (forceSaveTerms(newTerms)) break;
} }
} }
@ -235,13 +246,13 @@ public class ZkShardTerms implements AutoCloseable{
*/ */
public void doneRecovering(String coreNodeName) { public void doneRecovering(String coreNodeName) {
ShardTerms newTerms; ShardTerms newTerms;
while ( (newTerms = terms.doneRecovering(coreNodeName)) != null) { while ( (newTerms = terms.get().doneRecovering(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break; if (forceSaveTerms(newTerms)) break;
} }
} }
public boolean isRecovering(String name) { public boolean isRecovering(String name) {
return terms.isRecovering(name); return terms.get().isRecovering(name);
} }
/** /**
@ -250,17 +261,17 @@ public class ZkShardTerms implements AutoCloseable{
*/ */
public void ensureHighestTermsAreNotZero() { public void ensureHighestTermsAreNotZero() {
ShardTerms newTerms; ShardTerms newTerms;
while ( (newTerms = terms.ensureHighestTermsAreNotZero()) != null) { while ( (newTerms = terms.get().ensureHighestTermsAreNotZero()) != null) {
if (forceSaveTerms(newTerms)) break; if (forceSaveTerms(newTerms)) break;
} }
} }
public long getHighestTerm() { public long getHighestTerm() {
return terms.getMaxTerm(); return terms.get().getMaxTerm();
} }
public long getTerm(String coreNodeName) { public long getTerm(String coreNodeName) {
Long term = terms.getTerm(coreNodeName); Long term = terms.get().getTerm(coreNodeName);
return term == null? -1 : term; return term == null? -1 : term;
} }
@ -408,12 +419,18 @@ public class ZkShardTerms implements AutoCloseable{
*/ */
private void setNewTerms(ShardTerms newTerms) { private void setNewTerms(ShardTerms newTerms) {
boolean isChanged = false; boolean isChanged = false;
synchronized (writingLock) { for (;;) {
if (terms == null || newTerms.getVersion() > terms.getVersion()) { ShardTerms terms = this.terms.get();
terms = newTerms; if (terms == null || newTerms.getVersion() > terms.getVersion()) {
isChanged = true; if (this.terms.compareAndSet(terms, newTerms)) {
isChanged = true;
break;
}
} else {
break;
} }
} }
if (isChanged) onTermUpdates(newTerms); if (isChanged) onTermUpdates(newTerms);
} }