mirror of
https://github.com/apache/lucene.git
synced 2025-02-08 19:15:06 +00:00
SOLR-8948: OverseerTaskQueue.containsTaskWithRequestId encounters json parse error if a SolrResponse node is in the overseer queue
This commit is contained in:
parent
a5afd1cee8
commit
4205b1c804
@ -104,6 +104,9 @@ Bug Fixes
|
||||
* SOLR-8875: SolrCloud Overseer clusterState could unexpectedly be null resulting in NPE.
|
||||
(Scott Blum via David Smiley)
|
||||
|
||||
* SOLR-8948: OverseerTaskQueue.containsTaskWithRequestId encounters json parse error if a
|
||||
SolrResponse node is in the overseer queue. (Jessica Cheng Mallet via shalin)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
* SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation.
|
||||
|
@ -61,7 +61,7 @@ public class OverseerTaskQueue extends DistributedQueue {
|
||||
List<String> childNames = zookeeper.getChildren(dir, null, true);
|
||||
stats.setQueueLength(childNames.size());
|
||||
for (String childName : childNames) {
|
||||
if (childName != null) {
|
||||
if (childName != null && childName.startsWith(PREFIX)) {
|
||||
try {
|
||||
byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true);
|
||||
if (data != null) {
|
||||
@ -185,17 +185,14 @@ public class OverseerTaskQueue extends DistributedQueue {
|
||||
try {
|
||||
// Create and watch the response node before creating the request node;
|
||||
// otherwise we may miss the response.
|
||||
String watchID = createData(
|
||||
dir + "/" + response_prefix,
|
||||
null, CreateMode.EPHEMERAL_SEQUENTIAL);
|
||||
String watchID = createResponseNode();
|
||||
|
||||
Object lock = new Object();
|
||||
LatchWatcher watcher = new LatchWatcher(lock);
|
||||
Stat stat = zookeeper.exists(watchID, watcher, true);
|
||||
|
||||
// create the request node
|
||||
createData(dir + "/" + PREFIX + watchID.substring(watchID.lastIndexOf("-") + 1),
|
||||
data, CreateMode.PERSISTENT);
|
||||
createRequestNode(data, watchID);
|
||||
|
||||
synchronized (lock) {
|
||||
if (stat != null && watcher.getWatchedEvent() == null) {
|
||||
@ -213,6 +210,18 @@ public class OverseerTaskQueue extends DistributedQueue {
|
||||
}
|
||||
}
|
||||
|
||||
void createRequestNode(byte[] data, String watchID) throws KeeperException, InterruptedException {
|
||||
createData(dir + "/" + PREFIX + watchID.substring(watchID.lastIndexOf("-") + 1),
|
||||
data, CreateMode.PERSISTENT);
|
||||
}
|
||||
|
||||
String createResponseNode() throws KeeperException, InterruptedException {
|
||||
return createData(
|
||||
dir + "/" + response_prefix,
|
||||
null, CreateMode.EPHEMERAL_SEQUENTIAL);
|
||||
}
|
||||
|
||||
|
||||
public List<QueueEvent> peekTopN(int n, Set<String> excludeSet, long waitMillis)
|
||||
throws KeeperException, InterruptedException {
|
||||
ArrayList<QueueEvent> topN = new ArrayList<>();
|
||||
|
@ -16,6 +16,19 @@
|
||||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrResponse;
|
||||
import org.apache.solr.client.solrj.response.SolrResponseBase;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CommonAdminParams;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.junit.Test;
|
||||
|
||||
public class OverseerTaskQueueTest extends DistributedQueueTest {
|
||||
|
||||
|
||||
@ -25,4 +38,58 @@ public class OverseerTaskQueueTest extends DistributedQueueTest {
|
||||
protected OverseerTaskQueue makeDistributedQueue(String dqZNode) throws Exception {
|
||||
return new OverseerTaskQueue(zkClient, setupNewDistributedQueueZNode(dqZNode));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainsTaskWithRequestId() throws Exception {
|
||||
String tqZNode = "/taskqueue/test";
|
||||
String requestId = "foo";
|
||||
String nonExistentRequestId = "bar";
|
||||
|
||||
OverseerTaskQueue tq = makeDistributedQueue(tqZNode);
|
||||
|
||||
// Basic ops
|
||||
// Put an expected Overseer task onto the queue
|
||||
final Map<String, Object> props = new HashMap<>();
|
||||
props.put(CommonParams.NAME, "coll1");
|
||||
props.put(OverseerCollectionMessageHandler.COLL_CONF, "myconf");
|
||||
props.put(OverseerCollectionMessageHandler.NUM_SLICES, 1);
|
||||
props.put(ZkStateReader.REPLICATION_FACTOR, 3);
|
||||
props.put(CommonAdminParams.ASYNC, requestId);
|
||||
tq.offer(Utils.toJSON(props));
|
||||
|
||||
assertTrue("Task queue should contain task with requestid " + requestId,
|
||||
tq.containsTaskWithRequestId(CommonAdminParams.ASYNC, requestId));
|
||||
|
||||
assertFalse("Task queue should not contain task with requestid " + nonExistentRequestId,
|
||||
tq.containsTaskWithRequestId(CommonAdminParams.ASYNC, nonExistentRequestId));
|
||||
|
||||
// Create a response node as if someone is waiting for a response from the Overseer; then,
|
||||
// create the request node.
|
||||
// Here we're reaching a bit into the internals of OverseerTaskQueue in order to create the same
|
||||
// response node structure but without setting a watch on it and removing it immediately when
|
||||
// a response is set, in order to artificially create the race condition that
|
||||
// containsTaskWithRequestId runs while the response is still in the queue.
|
||||
String watchID = tq.createResponseNode();
|
||||
String requestId2 = "baz";
|
||||
props.put(CommonAdminParams.ASYNC, requestId2);
|
||||
tq.createRequestNode(Utils.toJSON(props), watchID);
|
||||
|
||||
// Set a SolrResponse as the response node by removing the QueueEvent, as done in OverseerTaskProcessor
|
||||
List<OverseerTaskQueue.QueueEvent> queueEvents = tq.peekTopN(2, Collections.emptySet(), 1000);
|
||||
OverseerTaskQueue.QueueEvent requestId2Event = null;
|
||||
for (OverseerTaskQueue.QueueEvent queueEvent : queueEvents) {
|
||||
Map<String, Object> eventProps = (Map<String, Object>) Utils.fromJSON(queueEvent.getBytes());
|
||||
if (requestId2.equals(eventProps.get(CommonAdminParams.ASYNC))) {
|
||||
requestId2Event = queueEvent;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertNotNull("Didn't find event with requestid " + requestId2, requestId2Event);
|
||||
requestId2Event.setBytes(SolrResponse.serializable(new SolrResponseBase()));
|
||||
tq.remove(requestId2Event);
|
||||
|
||||
// Make sure this call to check if requestId exists doesn't barf with Json parse exception
|
||||
assertTrue("Task queue should contain task with requestid " + requestId,
|
||||
tq.containsTaskWithRequestId(CommonAdminParams.ASYNC, requestId));
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user