mirror of https://github.com/apache/lucene.git
SOLR-9440: The ZkStateReader.removeCollectionStateWatcher method can cache a DocCollection reference and never update it causing stale state to be returned in ClusterState
This commit is contained in:
parent
487f67620d
commit
39376cd8b5
|
@ -89,6 +89,9 @@ Bug Fixes
|
|||
|
||||
* SOLR-11413: SolrGraphiteReporter fails to report metrics due to non-thread safe code. (Erik Persson, ab)
|
||||
|
||||
* SOLR-9440: The ZkStateReader.removeCollectionStateWatcher method can cache a DocCollection reference and
|
||||
never update it causing stale state to be returned in ClusterState. (shalin)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
* SOLR-11285: Refactor autoscaling framework to avoid direct references to Zookeeper and Solr
|
||||
|
|
|
@ -424,25 +424,19 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
|
|||
|
||||
boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
|
||||
TimeOut timeout = new TimeOut(timeoutms, TimeUnit.MILLISECONDS);
|
||||
// TODO: remove this workaround for SOLR-9440
|
||||
zkStateReader.registerCore(collectionName);
|
||||
try {
|
||||
while (! timeout.hasTimedOut()) {
|
||||
Thread.sleep(100);
|
||||
DocCollection docCollection = zkStateReader.getClusterState().getCollection(collectionName);
|
||||
if (docCollection == null) { // someone already deleted the collection
|
||||
return true;
|
||||
}
|
||||
Slice slice = docCollection.getSlice(shard);
|
||||
if(slice == null || slice.getReplica(replicaName) == null) {
|
||||
return true;
|
||||
}
|
||||
while (! timeout.hasTimedOut()) {
|
||||
Thread.sleep(100);
|
||||
DocCollection docCollection = zkStateReader.getClusterState().getCollection(collectionName);
|
||||
if (docCollection == null) { // someone already deleted the collection
|
||||
return true;
|
||||
}
|
||||
Slice slice = docCollection.getSlice(shard);
|
||||
if(slice == null || slice.getReplica(replicaName) == null) {
|
||||
return true;
|
||||
}
|
||||
// replica still exists after the timeout
|
||||
return false;
|
||||
} finally {
|
||||
zkStateReader.unregisterCore(collectionName);
|
||||
}
|
||||
// replica still exists after the timeout
|
||||
return false;
|
||||
}
|
||||
|
||||
void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws Exception {
|
||||
|
|
|
@ -359,14 +359,6 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
};
|
||||
|
||||
Thread monkeyThread = null;
|
||||
/*
|
||||
somehow the cluster state object inside this zk state reader has static copy of the collection which is never updated
|
||||
so any call to waitForRecoveriesToFinish just keeps looping until timeout.
|
||||
We workaround by explicitly registering the collection as an interesting one so that it is watched by ZkStateReader
|
||||
see SOLR-9440. Todo remove this hack after SOLR-9440 is fixed.
|
||||
*/
|
||||
cloudClient.getZkStateReader().registerCore(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
|
||||
|
||||
monkeyThread = new Thread(monkey);
|
||||
monkeyThread.start();
|
||||
try {
|
||||
|
|
|
@ -104,7 +104,6 @@ public class TestCloudSearcherWarming extends SolrCloudTestCase {
|
|||
waitForState("The collection should have 1 shard and 1 replica", collectionName, clusterShape(1, 1));
|
||||
|
||||
solrClient.setDefaultCollection(collectionName);
|
||||
solrClient.getZkStateReader().registerCore(collectionName);
|
||||
|
||||
String addListenerCommand = "{" +
|
||||
"'add-listener' : {'name':'newSearcherListener','event':'newSearcher', 'class':'" + SleepingSolrEventListener.class.getName() + "'}" +
|
||||
|
@ -144,7 +143,6 @@ public class TestCloudSearcherWarming extends SolrCloudTestCase {
|
|||
waitForState("The collection should have 1 shard and 1 replica", collectionName, clusterShape(1, 1));
|
||||
|
||||
solrClient.setDefaultCollection(collectionName);
|
||||
solrClient.getZkStateReader().registerCore(collectionName);
|
||||
|
||||
String addListenerCommand = "{" +
|
||||
"'add-listener' : {'name':'newSearcherListener','event':'newSearcher', 'class':'" + SleepingSolrEventListener.class.getName() + "'}" +
|
||||
|
|
|
@ -261,7 +261,6 @@ public class TestPullReplica extends SolrCloudTestCase {
|
|||
CollectionAdminRequest.createCollection(collectionName, "conf", 2, 1, 0, 0)
|
||||
.setMaxShardsPerNode(100)
|
||||
.process(cluster.getSolrClient());
|
||||
cluster.getSolrClient().getZkStateReader().registerCore(collectionName); //TODO: Why is this needed? see SOLR-9440
|
||||
waitForState("Expected collection to be created with 2 shards and 1 replica each", collectionName, clusterShape(2, 1));
|
||||
DocCollection docCollection = assertNumberOfReplicas(2, 0, 0, false, true);
|
||||
assertEquals(2, docCollection.getSlices().size());
|
||||
|
@ -370,7 +369,6 @@ public class TestPullReplica extends SolrCloudTestCase {
|
|||
CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 0, 1)
|
||||
.setMaxShardsPerNode(100)
|
||||
.process(cluster.getSolrClient());
|
||||
cluster.getSolrClient().getZkStateReader().registerCore(collectionName); //TODO: Why is this needed? see SOLR-9440
|
||||
waitForState("Expected collection to be created with 1 shard and 2 replicas", collectionName, clusterShape(1, 2));
|
||||
DocCollection docCollection = assertNumberOfReplicas(1, 0, 1, false, true);
|
||||
|
||||
|
|
|
@ -729,8 +729,7 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
.setMaxShardsPerNode(100)
|
||||
.process(cluster.getSolrClient());
|
||||
int numReplicasPerShard = numNrtReplicas + numTlogReplicas + numPullReplicas;
|
||||
cluster.getSolrClient().getZkStateReader().registerCore(collectionName); //TODO: Why is this needed? see SOLR-9440
|
||||
waitForState("Expected collection to be created with " + numShards + " shards and " + numReplicasPerShard + " replicas",
|
||||
waitForState("Expected collection to be created with " + numShards + " shards and " + numReplicasPerShard + " replicas",
|
||||
collectionName, clusterShape(numShards, numReplicasPerShard));
|
||||
return assertNumberOfReplicas(numNrtReplicas*numShards, numTlogReplicas*numShards, numPullReplicas*numShards, false, true);
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
|||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.cloud.ClusterStateUtil;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
|
@ -77,10 +78,6 @@ public class AutoAddReplicasIntegrationTest extends SolrCloudTestCase {
|
|||
.process(cluster.getSolrClient());
|
||||
|
||||
ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
|
||||
// todo remove this workaround after SOLR-9440 is fixed
|
||||
zkStateReader.registerCore("testSimple1");
|
||||
zkStateReader.registerCore("testSimple2");
|
||||
zkStateReader.registerCore("testSimple3");
|
||||
|
||||
// start the tests
|
||||
JettySolrRunner lostJetty = random().nextBoolean() ? cluster.getJettySolrRunner(0) : cluster.getJettySolrRunner(1);
|
||||
|
@ -91,6 +88,7 @@ public class AutoAddReplicasIntegrationTest extends SolrCloudTestCase {
|
|||
waitForState("Waiting for collection " + COLLECTION1, COLLECTION1, clusterShape(2, 2));
|
||||
checkSharedFsReplicasMovedCorrectly(replacedHdfsReplicas, zkStateReader, COLLECTION1);
|
||||
lostJetty.start();
|
||||
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 90000));
|
||||
|
||||
// check cluster property is considered
|
||||
disableAutoAddReplicasInCluster();
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.solr.common.cloud.DocRouter;
|
|||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
|
||||
public class ZkStateReaderTest extends SolrTestCaseJ4 {
|
||||
|
||||
|
@ -177,6 +178,56 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
|
|||
}
|
||||
}
|
||||
|
||||
public void testCollectionStateWatcherCaching() throws Exception {
|
||||
String zkDir = createTempDir("testCollectionStateWatcherCaching").toFile().getAbsolutePath();
|
||||
|
||||
ZkTestServer server = new ZkTestServer(zkDir);
|
||||
|
||||
SolrZkClient zkClient = null;
|
||||
ZkStateReader reader = null;
|
||||
|
||||
try {
|
||||
server.run();
|
||||
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
|
||||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
|
||||
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
|
||||
ZkController.createClusterZkNodes(zkClient);
|
||||
|
||||
reader = new ZkStateReader(zkClient);
|
||||
reader.createClusterStateWatchersAndUpdate();
|
||||
|
||||
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
|
||||
|
||||
ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
|
||||
DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json");
|
||||
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
|
||||
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
|
||||
writer.writePendingUpdates();
|
||||
assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
|
||||
reader.waitForState("c1", 1, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState != null);
|
||||
|
||||
state = new DocCollection("c1", new HashMap<>(), Collections.singletonMap("x", "y"), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json");
|
||||
wc = new ZkWriteCommand("c1", state);
|
||||
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
|
||||
writer.writePendingUpdates();
|
||||
|
||||
boolean found = false;
|
||||
TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS);
|
||||
while (!timeOut.hasTimedOut()) {
|
||||
DocCollection c1 = reader.getClusterState().getCollection("c1");
|
||||
if ("y".equals(c1.getStr("x"))) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertTrue("Could not find updated property in collection c1 even after 5 seconds", found);
|
||||
} finally {
|
||||
IOUtils.close(reader, zkClient);
|
||||
server.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public void testWatchedCollectionCreation() throws Exception {
|
||||
String zkDir = createTempDir("testWatchedCollectionCreation").toFile().getAbsolutePath();
|
||||
|
||||
|
|
|
@ -179,9 +179,6 @@ public class AutoscalingHistoryHandlerTest extends SolrCloudTestCase {
|
|||
clusterShape(1, 3));
|
||||
waitForState("Timed out wait for collection be active", CollectionAdminParams.SYSTEM_COLL,
|
||||
clusterShape(1, 3));
|
||||
// todo remove this workaround after SOLR-9440
|
||||
cluster.getSolrClient().getZkStateReader().registerCore(".system");
|
||||
cluster.getSolrClient().getZkStateReader().registerCore(PREFIX + "_collection");
|
||||
|
||||
log.info("### Start add node...");
|
||||
JettySolrRunner jetty = cluster.startJettySolrRunner();
|
||||
|
|
|
@ -158,7 +158,7 @@ public class ZkStateReader implements Closeable {
|
|||
|
||||
private final Runnable securityNodeListener;
|
||||
|
||||
private ConcurrentHashMap<String, CollectionWatch> collectionWatches = new ConcurrentHashMap<>();
|
||||
public ConcurrentHashMap<String, CollectionWatch> collectionWatches = new ConcurrentHashMap<>();
|
||||
|
||||
private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches");
|
||||
|
||||
|
@ -1340,14 +1340,24 @@ public class ZkStateReader implements Closeable {
|
|||
* @param watcher the watcher
|
||||
*/
|
||||
public void removeCollectionStateWatcher(String collection, CollectionStateWatcher watcher) {
|
||||
AtomicBoolean reconstructState = new AtomicBoolean(false);
|
||||
collectionWatches.compute(collection, (k, v) -> {
|
||||
if (v == null)
|
||||
return null;
|
||||
v.stateWatchers.remove(watcher);
|
||||
if (v.canBeRemoved())
|
||||
if (v.canBeRemoved()) {
|
||||
watchedCollectionStates.remove(collection);
|
||||
lazyCollectionStates.put(collection, new LazyCollectionRef(collection));
|
||||
reconstructState.set(true);
|
||||
return null;
|
||||
}
|
||||
return v;
|
||||
});
|
||||
if (reconstructState.get()) {
|
||||
synchronized (getUpdateLock()) {
|
||||
constructState(Collections.emptySet());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* package-private for testing */
|
||||
|
|
Loading…
Reference in New Issue