mirror of
https://github.com/apache/lucene.git
synced 2025-02-12 21:15:19 +00:00
SOLR-8948: OverseerTaskQueue.containsTaskWithRequestId encounters json parse error if a SolrResponse node is in the overseer queue
This commit is contained in:
parent
a5afd1cee8
commit
4205b1c804
@ -104,6 +104,9 @@ Bug Fixes
|
|||||||
* SOLR-8875: SolrCloud Overseer clusterState could unexpectedly be null resulting in NPE.
|
* SOLR-8875: SolrCloud Overseer clusterState could unexpectedly be null resulting in NPE.
|
||||||
(Scott Blum via David Smiley)
|
(Scott Blum via David Smiley)
|
||||||
|
|
||||||
|
* SOLR-8948: OverseerTaskQueue.containsTaskWithRequestId encounters json parse error if a
|
||||||
|
SolrResponse node is in the overseer queue. (Jessica Cheng Mallet via shalin)
|
||||||
|
|
||||||
Optimizations
|
Optimizations
|
||||||
----------------------
|
----------------------
|
||||||
* SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation.
|
* SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation.
|
||||||
|
@ -61,7 +61,7 @@ public class OverseerTaskQueue extends DistributedQueue {
|
|||||||
List<String> childNames = zookeeper.getChildren(dir, null, true);
|
List<String> childNames = zookeeper.getChildren(dir, null, true);
|
||||||
stats.setQueueLength(childNames.size());
|
stats.setQueueLength(childNames.size());
|
||||||
for (String childName : childNames) {
|
for (String childName : childNames) {
|
||||||
if (childName != null) {
|
if (childName != null && childName.startsWith(PREFIX)) {
|
||||||
try {
|
try {
|
||||||
byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true);
|
byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true);
|
||||||
if (data != null) {
|
if (data != null) {
|
||||||
@ -185,17 +185,14 @@ public class OverseerTaskQueue extends DistributedQueue {
|
|||||||
try {
|
try {
|
||||||
// Create and watch the response node before creating the request node;
|
// Create and watch the response node before creating the request node;
|
||||||
// otherwise we may miss the response.
|
// otherwise we may miss the response.
|
||||||
String watchID = createData(
|
String watchID = createResponseNode();
|
||||||
dir + "/" + response_prefix,
|
|
||||||
null, CreateMode.EPHEMERAL_SEQUENTIAL);
|
|
||||||
|
|
||||||
Object lock = new Object();
|
Object lock = new Object();
|
||||||
LatchWatcher watcher = new LatchWatcher(lock);
|
LatchWatcher watcher = new LatchWatcher(lock);
|
||||||
Stat stat = zookeeper.exists(watchID, watcher, true);
|
Stat stat = zookeeper.exists(watchID, watcher, true);
|
||||||
|
|
||||||
// create the request node
|
// create the request node
|
||||||
createData(dir + "/" + PREFIX + watchID.substring(watchID.lastIndexOf("-") + 1),
|
createRequestNode(data, watchID);
|
||||||
data, CreateMode.PERSISTENT);
|
|
||||||
|
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
if (stat != null && watcher.getWatchedEvent() == null) {
|
if (stat != null && watcher.getWatchedEvent() == null) {
|
||||||
@ -213,6 +210,18 @@ public class OverseerTaskQueue extends DistributedQueue {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void createRequestNode(byte[] data, String watchID) throws KeeperException, InterruptedException {
|
||||||
|
createData(dir + "/" + PREFIX + watchID.substring(watchID.lastIndexOf("-") + 1),
|
||||||
|
data, CreateMode.PERSISTENT);
|
||||||
|
}
|
||||||
|
|
||||||
|
String createResponseNode() throws KeeperException, InterruptedException {
|
||||||
|
return createData(
|
||||||
|
dir + "/" + response_prefix,
|
||||||
|
null, CreateMode.EPHEMERAL_SEQUENTIAL);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public List<QueueEvent> peekTopN(int n, Set<String> excludeSet, long waitMillis)
|
public List<QueueEvent> peekTopN(int n, Set<String> excludeSet, long waitMillis)
|
||||||
throws KeeperException, InterruptedException {
|
throws KeeperException, InterruptedException {
|
||||||
ArrayList<QueueEvent> topN = new ArrayList<>();
|
ArrayList<QueueEvent> topN = new ArrayList<>();
|
||||||
|
@ -16,6 +16,19 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.solr.cloud;
|
package org.apache.solr.cloud;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.SolrResponse;
|
||||||
|
import org.apache.solr.client.solrj.response.SolrResponseBase;
|
||||||
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
|
import org.apache.solr.common.params.CommonAdminParams;
|
||||||
|
import org.apache.solr.common.params.CommonParams;
|
||||||
|
import org.apache.solr.common.util.Utils;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
public class OverseerTaskQueueTest extends DistributedQueueTest {
|
public class OverseerTaskQueueTest extends DistributedQueueTest {
|
||||||
|
|
||||||
|
|
||||||
@ -25,4 +38,58 @@ public class OverseerTaskQueueTest extends DistributedQueueTest {
|
|||||||
protected OverseerTaskQueue makeDistributedQueue(String dqZNode) throws Exception {
|
protected OverseerTaskQueue makeDistributedQueue(String dqZNode) throws Exception {
|
||||||
return new OverseerTaskQueue(zkClient, setupNewDistributedQueueZNode(dqZNode));
|
return new OverseerTaskQueue(zkClient, setupNewDistributedQueueZNode(dqZNode));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainsTaskWithRequestId() throws Exception {
|
||||||
|
String tqZNode = "/taskqueue/test";
|
||||||
|
String requestId = "foo";
|
||||||
|
String nonExistentRequestId = "bar";
|
||||||
|
|
||||||
|
OverseerTaskQueue tq = makeDistributedQueue(tqZNode);
|
||||||
|
|
||||||
|
// Basic ops
|
||||||
|
// Put an expected Overseer task onto the queue
|
||||||
|
final Map<String, Object> props = new HashMap<>();
|
||||||
|
props.put(CommonParams.NAME, "coll1");
|
||||||
|
props.put(OverseerCollectionMessageHandler.COLL_CONF, "myconf");
|
||||||
|
props.put(OverseerCollectionMessageHandler.NUM_SLICES, 1);
|
||||||
|
props.put(ZkStateReader.REPLICATION_FACTOR, 3);
|
||||||
|
props.put(CommonAdminParams.ASYNC, requestId);
|
||||||
|
tq.offer(Utils.toJSON(props));
|
||||||
|
|
||||||
|
assertTrue("Task queue should contain task with requestid " + requestId,
|
||||||
|
tq.containsTaskWithRequestId(CommonAdminParams.ASYNC, requestId));
|
||||||
|
|
||||||
|
assertFalse("Task queue should not contain task with requestid " + nonExistentRequestId,
|
||||||
|
tq.containsTaskWithRequestId(CommonAdminParams.ASYNC, nonExistentRequestId));
|
||||||
|
|
||||||
|
// Create a response node as if someone is waiting for a response from the Overseer; then,
|
||||||
|
// create the request node.
|
||||||
|
// Here we're reaching a bit into the internals of OverseerTaskQueue in order to create the same
|
||||||
|
// response node structure but without setting a watch on it and removing it immediately when
|
||||||
|
// a response is set, in order to artificially create the race condition that
|
||||||
|
// containsTaskWithRequestId runs while the response is still in the queue.
|
||||||
|
String watchID = tq.createResponseNode();
|
||||||
|
String requestId2 = "baz";
|
||||||
|
props.put(CommonAdminParams.ASYNC, requestId2);
|
||||||
|
tq.createRequestNode(Utils.toJSON(props), watchID);
|
||||||
|
|
||||||
|
// Set a SolrResponse as the response node by removing the QueueEvent, as done in OverseerTaskProcessor
|
||||||
|
List<OverseerTaskQueue.QueueEvent> queueEvents = tq.peekTopN(2, Collections.emptySet(), 1000);
|
||||||
|
OverseerTaskQueue.QueueEvent requestId2Event = null;
|
||||||
|
for (OverseerTaskQueue.QueueEvent queueEvent : queueEvents) {
|
||||||
|
Map<String, Object> eventProps = (Map<String, Object>) Utils.fromJSON(queueEvent.getBytes());
|
||||||
|
if (requestId2.equals(eventProps.get(CommonAdminParams.ASYNC))) {
|
||||||
|
requestId2Event = queueEvent;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertNotNull("Didn't find event with requestid " + requestId2, requestId2Event);
|
||||||
|
requestId2Event.setBytes(SolrResponse.serializable(new SolrResponseBase()));
|
||||||
|
tq.remove(requestId2Event);
|
||||||
|
|
||||||
|
// Make sure this call to check if requestId exists doesn't barf with Json parse exception
|
||||||
|
assertTrue("Task queue should contain task with requestid " + requestId,
|
||||||
|
tq.containsTaskWithRequestId(CommonAdminParams.ASYNC, requestId));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user