SOLR-8948: OverseerTaskQueue.containsTaskWithRequestId encounters json parse error if a SolrResponse node is in the overseer queue

This commit is contained in:
Shalin Shekhar Mangar 2016-04-06 11:56:27 +05:30
parent a5afd1cee8
commit 4205b1c804
3 changed files with 85 additions and 6 deletions

View File

@ -104,6 +104,9 @@ Bug Fixes
* SOLR-8875: SolrCloud Overseer clusterState could unexpectedly be null resulting in NPE.
(Scott Blum via David Smiley)
* SOLR-8948: OverseerTaskQueue.containsTaskWithRequestId encounters json parse error if a
SolrResponse node is in the overseer queue. (Jessica Cheng Mallet via shalin)
Optimizations
----------------------
* SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation.

View File

@ -61,7 +61,7 @@ public class OverseerTaskQueue extends DistributedQueue {
List<String> childNames = zookeeper.getChildren(dir, null, true);
stats.setQueueLength(childNames.size());
for (String childName : childNames) {
if (childName != null) {
if (childName != null && childName.startsWith(PREFIX)) {
try {
byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true);
if (data != null) {
@ -185,17 +185,14 @@ public class OverseerTaskQueue extends DistributedQueue {
try {
// Create and watch the response node before creating the request node;
// otherwise we may miss the response.
String watchID = createData(
dir + "/" + response_prefix,
null, CreateMode.EPHEMERAL_SEQUENTIAL);
String watchID = createResponseNode();
Object lock = new Object();
LatchWatcher watcher = new LatchWatcher(lock);
Stat stat = zookeeper.exists(watchID, watcher, true);
// create the request node
createData(dir + "/" + PREFIX + watchID.substring(watchID.lastIndexOf("-") + 1),
data, CreateMode.PERSISTENT);
createRequestNode(data, watchID);
synchronized (lock) {
if (stat != null && watcher.getWatchedEvent() == null) {
@ -213,6 +210,18 @@ public class OverseerTaskQueue extends DistributedQueue {
}
}
void createRequestNode(byte[] data, String watchID) throws KeeperException, InterruptedException {
createData(dir + "/" + PREFIX + watchID.substring(watchID.lastIndexOf("-") + 1),
data, CreateMode.PERSISTENT);
}
String createResponseNode() throws KeeperException, InterruptedException {
return createData(
dir + "/" + response_prefix,
null, CreateMode.EPHEMERAL_SEQUENTIAL);
}
public List<QueueEvent> peekTopN(int n, Set<String> excludeSet, long waitMillis)
throws KeeperException, InterruptedException {
ArrayList<QueueEvent> topN = new ArrayList<>();

View File

@ -16,6 +16,19 @@
*/
package org.apache.solr.cloud;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.response.SolrResponseBase;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.Utils;
import org.junit.Test;
public class OverseerTaskQueueTest extends DistributedQueueTest {
@ -25,4 +38,58 @@ public class OverseerTaskQueueTest extends DistributedQueueTest {
protected OverseerTaskQueue makeDistributedQueue(String dqZNode) throws Exception {
return new OverseerTaskQueue(zkClient, setupNewDistributedQueueZNode(dqZNode));
}
@Test
public void testContainsTaskWithRequestId() throws Exception {
String tqZNode = "/taskqueue/test";
String requestId = "foo";
String nonExistentRequestId = "bar";
OverseerTaskQueue tq = makeDistributedQueue(tqZNode);
// Basic ops
// Put an expected Overseer task onto the queue
final Map<String, Object> props = new HashMap<>();
props.put(CommonParams.NAME, "coll1");
props.put(OverseerCollectionMessageHandler.COLL_CONF, "myconf");
props.put(OverseerCollectionMessageHandler.NUM_SLICES, 1);
props.put(ZkStateReader.REPLICATION_FACTOR, 3);
props.put(CommonAdminParams.ASYNC, requestId);
tq.offer(Utils.toJSON(props));
assertTrue("Task queue should contain task with requestid " + requestId,
tq.containsTaskWithRequestId(CommonAdminParams.ASYNC, requestId));
assertFalse("Task queue should not contain task with requestid " + nonExistentRequestId,
tq.containsTaskWithRequestId(CommonAdminParams.ASYNC, nonExistentRequestId));
// Create a response node as if someone is waiting for a response from the Overseer; then,
// create the request node.
// Here we're reaching a bit into the internals of OverseerTaskQueue in order to create the same
// response node structure but without setting a watch on it and removing it immediately when
// a response is set, in order to artificially create the race condition that
// containsTaskWithRequestId runs while the response is still in the queue.
String watchID = tq.createResponseNode();
String requestId2 = "baz";
props.put(CommonAdminParams.ASYNC, requestId2);
tq.createRequestNode(Utils.toJSON(props), watchID);
// Set a SolrResponse as the response node by removing the QueueEvent, as done in OverseerTaskProcessor
List<OverseerTaskQueue.QueueEvent> queueEvents = tq.peekTopN(2, Collections.emptySet(), 1000);
OverseerTaskQueue.QueueEvent requestId2Event = null;
for (OverseerTaskQueue.QueueEvent queueEvent : queueEvents) {
Map<String, Object> eventProps = (Map<String, Object>) Utils.fromJSON(queueEvent.getBytes());
if (requestId2.equals(eventProps.get(CommonAdminParams.ASYNC))) {
requestId2Event = queueEvent;
break;
}
}
assertNotNull("Didn't find event with requestid " + requestId2, requestId2Event);
requestId2Event.setBytes(SolrResponse.serializable(new SolrResponseBase()));
tq.remove(requestId2Event);
// Make sure this call to check if requestId exists doesn't barf with Json parse exception
assertTrue("Task queue should contain task with requestid " + requestId,
tq.containsTaskWithRequestId(CommonAdminParams.ASYNC, requestId));
}
}