SOLR-1711: fix hang when queue is full but there are no runners

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1063869 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2011-01-26 20:57:05 +00:00
parent d494562133
commit 326ab7d577
2 changed files with 21 additions and 12 deletions

View File

@ -516,7 +516,8 @@ Bug Fixes
* SOLR-1711: SolrJ - StreamingUpdateSolrServer had a race condition that
could halt the streaming of documents. The original patch to fix this
(never officially released) introduced another hanging bug due to
connections not being released. (Attila Babo, Erik Hetzner via yonik)
connections not being released.
(Attila Babo, Erik Hetzner, Johannes Tuchscherer via yonik)
* SOLR-1748, SOLR-1747, SOLR-1746, SOLR-1745, SOLR-1744: Streams and Readers
retrieved from ContentStreams are not closed in various places, resulting

View File

@ -173,12 +173,20 @@ public class StreamingUpdateSolrServer extends CommonsHttpSolrServer
}
catch (Throwable e) {
handleError( e );
}
}
finally {
// remove it from the list of running things...
// remove it from the list of running things unless we are the last runner and the queue is full...
// in which case, the next queue.put() would block and there would be no runners to handle it.
synchronized (runners) {
runners.remove( this );
if (runners.size() == 1 && queue.remainingCapacity() == 0) {
// keep this runner alive
scheduler.execute(this);
} else {
runners.remove( this );
}
}
log.info( "finished: {}" , this );
runnerLock.unlock();
}
@ -208,7 +216,7 @@ public class StreamingUpdateSolrServer extends CommonsHttpSolrServer
return super.request( request );
}
}
try {
CountDownLatch tmpLock = lock;
if( tmpLock != null ) {
@ -216,18 +224,18 @@ public class StreamingUpdateSolrServer extends CommonsHttpSolrServer
}
queue.put( req );
synchronized( runners ) {
if( runners.isEmpty()
|| (queue.remainingCapacity() < queue.size()
&& runners.size() < threadCount) )
{
synchronized( runners ) {
if( runners.isEmpty()
|| (queue.remainingCapacity() < queue.size()
&& runners.size() < threadCount) )
{
Runner r = new Runner();
scheduler.execute( r );
runners.add( r );
}
}
}
}
catch (InterruptedException e) {
log.error( "interrupted", e );
throw new IOException( e.getLocalizedMessage() );