mirror of https://github.com/apache/lucene.git
Backout fix for SOLR-6631 as things like create collection are hanging now
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1635155 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e3f636a276
commit
cb0b53eaac
|
@ -278,9 +278,6 @@ Bug Fixes
|
||||||
* SOLR-6591: Overseer can use stale cluster state and lose updates for collections
|
* SOLR-6591: Overseer can use stale cluster state and lose updates for collections
|
||||||
with stateFormat > 1. (shalin)
|
with stateFormat > 1. (shalin)
|
||||||
|
|
||||||
* SOLR-6631: DistributedQueue spinning on calling zookeeper getChildren()
|
|
||||||
(Jessica Cheng Mallet, Mark Miller, Timothy Potter)
|
|
||||||
|
|
||||||
Optimizations
|
Optimizations
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
|
|
@ -246,16 +246,13 @@ public class DistributedQueue {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(WatchedEvent event) {
|
public void process(WatchedEvent event) {
|
||||||
Event.EventType eventType = event.getType();
|
|
||||||
LOG.info("LatchChildWatcher fired on path: " + event.getPath() + " state: "
|
LOG.info("LatchChildWatcher fired on path: " + event.getPath() + " state: "
|
||||||
+ event.getState() + " type " + eventType);
|
+ event.getState() + " type " + event.getType());
|
||||||
if (eventType == Event.EventType.NodeChildrenChanged) {
|
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
this.event = event;
|
this.event = event;
|
||||||
lock.notifyAll();
|
lock.notifyAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
public void await(long timeout) throws InterruptedException {
|
public void await(long timeout) throws InterruptedException {
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
|
|
|
@ -1,131 +0,0 @@
|
||||||
package org.apache.solr.cloud;
|
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with this
|
|
||||||
* work for additional information regarding copyright ownership. The ASF
|
|
||||||
* licenses this file to You under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations under
|
|
||||||
* the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
|
|
||||||
import org.apache.solr.SolrTestCaseJ4;
|
|
||||||
import org.apache.solr.common.cloud.SolrZkClient;
|
|
||||||
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
|
|
||||||
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
public class DistributedQueueTest extends SolrTestCaseJ4 {
|
|
||||||
|
|
||||||
protected ZkTestServer zkServer;
|
|
||||||
protected SolrZkClient zkClient;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
@Override
|
|
||||||
public void setUp() throws Exception {
|
|
||||||
super.setUp();
|
|
||||||
setupZk();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testDistributedQueue() throws Exception {
|
|
||||||
String dqZNode = "/distqueue/test";
|
|
||||||
String testData = "hello world";
|
|
||||||
long timeoutMs = 500L;
|
|
||||||
|
|
||||||
DistributedQueue dq = new DistributedQueue(zkClient, setupDistributedQueueZNode(dqZNode));
|
|
||||||
|
|
||||||
// basic ops
|
|
||||||
assertTrue(dq.poll() == null);
|
|
||||||
byte[] data = testData.getBytes("UTF-8");
|
|
||||||
dq.offer(data);
|
|
||||||
assertEquals(new String(dq.peek(),"UTF-8"), testData);
|
|
||||||
assertEquals(new String(dq.take(),"UTF-8"), testData);
|
|
||||||
assertTrue(dq.poll() == null);
|
|
||||||
QueueEvent qe = dq.offer(data, timeoutMs);
|
|
||||||
assertNotNull(qe);
|
|
||||||
assertEquals(new String(dq.remove(),"UTF-8"), testData);
|
|
||||||
|
|
||||||
// should block until the background thread makes the offer
|
|
||||||
(new QueueChangerThread(dq, 1000)).start();
|
|
||||||
qe = dq.peek(true);
|
|
||||||
assertNotNull(qe);
|
|
||||||
dq.remove();
|
|
||||||
|
|
||||||
// timeout scenario ... background thread won't offer until long after the peek times out
|
|
||||||
QueueChangerThread qct = new QueueChangerThread(dq, 1000);
|
|
||||||
qct.start();
|
|
||||||
qe = dq.peek(500);
|
|
||||||
assertTrue(qe == null);
|
|
||||||
|
|
||||||
try {
|
|
||||||
qct.interrupt();
|
|
||||||
} catch (Exception exc) {}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class QueueChangerThread extends Thread {
|
|
||||||
|
|
||||||
DistributedQueue dq;
|
|
||||||
long waitBeforeOfferMs;
|
|
||||||
|
|
||||||
QueueChangerThread(DistributedQueue dq, long waitBeforeOfferMs) {
|
|
||||||
this.dq = dq;
|
|
||||||
this.waitBeforeOfferMs = waitBeforeOfferMs;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
Thread.sleep(waitBeforeOfferMs);
|
|
||||||
dq.offer(getName().getBytes("UTF-8"));
|
|
||||||
} catch (InterruptedException ie) {
|
|
||||||
// do nothing
|
|
||||||
} catch (Exception exc) {
|
|
||||||
throw new RuntimeException(exc);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected String setupDistributedQueueZNode(String znodePath) throws Exception {
|
|
||||||
if (!zkClient.exists("/", true))
|
|
||||||
zkClient.makePath("/", false, true);
|
|
||||||
if (zkClient.exists(znodePath, true))
|
|
||||||
zkClient.clean(znodePath);
|
|
||||||
zkClient.makePath(znodePath, false, true);
|
|
||||||
return znodePath;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@After
|
|
||||||
public void tearDown() throws Exception {
|
|
||||||
try {
|
|
||||||
super.tearDown();
|
|
||||||
} catch (Exception exc) {}
|
|
||||||
closeZk();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void setupZk() throws Exception {
|
|
||||||
System.setProperty("zkClientTimeout", "8000");
|
|
||||||
zkServer = new ZkTestServer(createTempDir("zkData").toFile().getAbsolutePath());
|
|
||||||
zkServer.run();
|
|
||||||
System.setProperty("zkHost", zkServer.getZkAddress());
|
|
||||||
zkClient = new SolrZkClient(zkServer.getZkAddress(), AbstractZkTestCase.TIMEOUT);
|
|
||||||
assertTrue(zkClient.isConnected());
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void closeZk() throws Exception {
|
|
||||||
if (zkClient != null)
|
|
||||||
zkClient.close();
|
|
||||||
zkServer.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue