From bade20e8b73e7d51dfc34f9700148bf0ff9f2247 Mon Sep 17 00:00:00 2001 From: Yonik Seeley Date: Fri, 6 Nov 2009 01:55:39 +0000 Subject: [PATCH] SOLR-1543: fix thread hangs git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@833272 13f79535-47bb-0310-9956-ffa450edef68 --- .../solrj/impl/StreamingUpdateSolrServer.java | 59 ++++++++----------- 1 file changed, 25 insertions(+), 34 deletions(-) diff --git a/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java b/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java index cc57ab0c57d..cffdfde95ed 100644 --- a/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java +++ b/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java @@ -23,11 +23,7 @@ import java.io.OutputStreamWriter; import java.net.MalformedURLException; import java.util.LinkedList; import java.util.Queue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -63,8 +59,8 @@ public class StreamingUpdateSolrServer extends CommonsHttpSolrServer final ExecutorService scheduler = Executors.newCachedThreadPool(); final String updateUrl = "/update"; final Queue runners; - Lock lock = null; // used to block everything - int threadCount = 1; + volatile CountDownLatch lock = null; // used to block everything + final int threadCount; public StreamingUpdateSolrServer(String solrServerUrl, int queueSize, int threadCount ) throws MalformedURLException { super( solrServerUrl ); @@ -77,10 +73,10 @@ public class StreamingUpdateSolrServer extends CommonsHttpSolrServer * Opens a connection and sends everything... */ class Runner implements Runnable { - final Lock lock = new ReentrantLock(); - + final Lock runnerLock = new ReentrantLock(); + public void run() { - lock.lock(); + runnerLock.lock(); // info is ok since this should only happen once for each thread log.info( "starting runner: {}" , this ); @@ -162,7 +158,7 @@ public class StreamingUpdateSolrServer extends CommonsHttpSolrServer runners.remove( this ); } log.info( "finished: {}" , this ); - lock.unlock(); + runnerLock.unlock(); } } } @@ -191,11 +187,12 @@ public class StreamingUpdateSolrServer extends CommonsHttpSolrServer } } - - if( lock != null ) { - lock.lock(); // keep it from adding new commands while we block - } try { + CountDownLatch tmpLock = lock; + if( tmpLock != null ) { + tmpLock.await(); + } + queue.put( req ); if( runners.isEmpty() @@ -213,34 +210,28 @@ public class StreamingUpdateSolrServer extends CommonsHttpSolrServer log.error( "interuped", e ); throw new IOException( e.getLocalizedMessage() ); } - finally { - if( lock != null ) { - lock.unlock(); - } - } // RETURN A DUMMY result NamedList dummy = new NamedList(); dummy.add( "NOTE", "the request is processed in a background stream" ); return dummy; } - + public synchronized void blockUntilFinished() { - if( lock == null ) { - lock = new ReentrantLock(); + lock = new CountDownLatch(1); + try { + // Wait until no runners are running + Runner runner = runners.peek(); + while( runner != null ) { + runner.runnerLock.lock(); + runner.runnerLock.unlock(); + runner = runners.peek(); + } + } finally { + lock.countDown(); + lock=null; } - lock.lock(); - - // Wait until no runners are running - Runner runner = runners.peek(); - while( runner != null ) { - runner.lock.lock(); - runner.lock.unlock(); - runner = runners.peek(); - } - lock.unlock(); - lock = null; } public void handleError( Throwable ex )