SOLR-1543: fix thread hangs

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@833272 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2009-11-06 01:55:39 +00:00
parent 942044a292
commit bade20e8b7
1 changed files with 25 additions and 34 deletions

View File

@ -23,11 +23,7 @@ import java.io.OutputStreamWriter;
import java.net.MalformedURLException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -63,8 +59,8 @@ public class StreamingUpdateSolrServer extends CommonsHttpSolrServer
final ExecutorService scheduler = Executors.newCachedThreadPool();
final String updateUrl = "/update";
final Queue<Runner> runners;
Lock lock = null; // used to block everything
int threadCount = 1;
volatile CountDownLatch lock = null; // used to block everything
final int threadCount;
public StreamingUpdateSolrServer(String solrServerUrl, int queueSize, int threadCount ) throws MalformedURLException {
super( solrServerUrl );
@ -77,10 +73,10 @@ public class StreamingUpdateSolrServer extends CommonsHttpSolrServer
* Opens a connection and sends everything...
*/
class Runner implements Runnable {
final Lock lock = new ReentrantLock();
final Lock runnerLock = new ReentrantLock();
public void run() {
lock.lock();
runnerLock.lock();
// info is ok since this should only happen once for each thread
log.info( "starting runner: {}" , this );
@ -162,7 +158,7 @@ public class StreamingUpdateSolrServer extends CommonsHttpSolrServer
runners.remove( this );
}
log.info( "finished: {}" , this );
lock.unlock();
runnerLock.unlock();
}
}
}
@ -191,11 +187,12 @@ public class StreamingUpdateSolrServer extends CommonsHttpSolrServer
}
}
if( lock != null ) {
lock.lock(); // keep it from adding new commands while we block
}
try {
CountDownLatch tmpLock = lock;
if( tmpLock != null ) {
tmpLock.await();
}
queue.put( req );
if( runners.isEmpty()
@ -213,11 +210,6 @@ public class StreamingUpdateSolrServer extends CommonsHttpSolrServer
log.error( "interuped", e );
throw new IOException( e.getLocalizedMessage() );
}
finally {
if( lock != null ) {
lock.unlock();
}
}
// RETURN A DUMMY result
NamedList<Object> dummy = new NamedList<Object>();
@ -227,20 +219,19 @@ public class StreamingUpdateSolrServer extends CommonsHttpSolrServer
public synchronized void blockUntilFinished()
{
if( lock == null ) {
lock = new ReentrantLock();
}
lock.lock();
lock = new CountDownLatch(1);
try {
// Wait until no runners are running
Runner runner = runners.peek();
while( runner != null ) {
runner.lock.lock();
runner.lock.unlock();
runner.runnerLock.lock();
runner.runnerLock.unlock();
runner = runners.peek();
}
lock.unlock();
lock = null;
} finally {
lock.countDown();
lock=null;
}
}
public void handleError( Throwable ex )