461643 - HttpContent.advance() race.

Fixed by correctly synchronizing on ContentProvider.iterator.hasNext()
and ContentProvider.iterator.next() for those ContentProviders that
require it, such as DeferredContentProvider.
This commit is contained in:
Simone Bordet 2015-03-07 23:13:21 +01:00
parent 66e1f0a863
commit 7c915bcba2
6 changed files with 439 additions and 143 deletions

View File

@ -58,7 +58,7 @@ import org.eclipse.jetty.util.log.Logger;
* {@link HttpContent} may not have content, if the related {@link ContentProvider} is {@code null}, and this
* is reflected by {@link #hasContent()}.
* <p />
* {@link HttpContent} may have {@link DeferredContentProvider deferred content}, in which case {@link #advance()}
* {@link HttpContent} may have {@link AsyncContentProvider deferred content}, in which case {@link #advance()}
* moves the cursor to a position that provides {@code null} {@link #getByteBuffer() buffer} and
* {@link #getContent() content}. When the deferred content is available, a further call to {@link #advance()}
* will move the cursor to a position that provides non {@code null} buffer and content.
@ -124,7 +124,34 @@ public class HttpContent implements Callback, Closeable
*/
public boolean advance()
{
if (isLast())
boolean advanced;
boolean hasNext;
ByteBuffer bytes;
if (iterator instanceof Synchronizable)
{
synchronized (((Synchronizable)iterator).getLock())
{
advanced = iterator.hasNext();
bytes = advanced ? iterator.next() : null;
hasNext = advanced && iterator.hasNext();
}
}
else
{
advanced = iterator.hasNext();
bytes = advanced ? iterator.next() : null;
hasNext = advanced && iterator.hasNext();
}
if (advanced)
{
buffer = bytes;
content = bytes == null ? null : bytes.slice();
if (LOG.isDebugEnabled())
LOG.debug("Advanced content to {} chunk {}", hasNext ? "next" : "last", bytes);
return bytes != null;
}
else
{
if (content != AFTER)
{
@ -134,14 +161,6 @@ public class HttpContent implements Callback, Closeable
}
return false;
}
else
{
ByteBuffer buffer = this.buffer = iterator.next();
if (LOG.isDebugEnabled())
LOG.debug("Advanced content to {} chunk {}", isLast() ? "last" : "next", buffer);
content = buffer == null ? null : buffer.slice();
return buffer != null;
}
}
/**

View File

@ -144,6 +144,8 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
case EXPECTING_WITH_CONTENT:
case PROCEEDING_WITH_CONTENT:
case WAITING:
case COMPLETED:
case FAILED:
{
if (LOG.isDebugEnabled())
LOG.debug("Deferred content available, {}", current);
@ -151,7 +153,8 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
}
default:
{
throw illegalSenderState(current);
illegalSenderState(current);
return;
}
}
}
@ -169,8 +172,26 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
SenderState newSenderState = SenderState.SENDING;
if (expects100Continue(request))
newSenderState = content.hasContent() ? SenderState.EXPECTING_WITH_CONTENT : SenderState.EXPECTING;
if (!updateSenderState(SenderState.IDLE, newSenderState))
throw illegalSenderState(SenderState.IDLE);
out: while (true)
{
SenderState current = senderState.get();
switch (current)
{
case IDLE:
case COMPLETED:
{
if (updateSenderState(current, newSenderState))
break out;
break;
}
default:
{
illegalSenderState(current);
return;
}
}
}
// Setting the listener may trigger calls to onContent() by other
// threads so we must set it only after the sender state has been updated
@ -424,16 +445,19 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
protected void reset()
{
HttpContent content = this.content;
this.content = null;
content.close();
content = null;
senderState.set(SenderState.IDLE);
senderState.set(SenderState.COMPLETED);
}
protected void dispose()
{
HttpContent content = this.content;
this.content = null;
if (content != null)
content.close();
senderState.set(SenderState.FAILED);
}
public void proceed(HttpExchange exchange, Throwable failure)
@ -482,16 +506,23 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
case WAITING:
{
// We received the 100 Continue, now send the content if any.
if (!updateSenderState(current, SenderState.SENDING))
throw illegalSenderState(current);
if (updateSenderState(current, SenderState.SENDING))
{
if (LOG.isDebugEnabled())
LOG.debug("Proceeding while waiting");
contentCallback.iterate();
return;
}
break;
}
case FAILED:
{
return;
}
default:
{
throw illegalSenderState(current);
illegalSenderState(current);
return;
}
}
}
@ -518,9 +549,9 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return updated;
}
private RuntimeException illegalSenderState(SenderState current)
private void illegalSenderState(SenderState current)
{
return new IllegalStateException("Expected " + current + " found " + senderState.get() + " instead");
anyToFailure(new IllegalStateException("Expected " + current + " found " + senderState.get() + " instead"));
}
@Override
@ -609,7 +640,15 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
/**
* {@link HttpSender} is sending the headers, while 100 Continue has arrived, and deferred content is available to be sent
*/
PROCEEDING_WITH_CONTENT
PROCEEDING_WITH_CONTENT,
/**
* {@link HttpSender} has finished to send the request
*/
COMPLETED,
/**
* {@link HttpSender} has failed to send the request
*/
FAILED
}
private class CommitCallback implements Callback
@ -619,11 +658,13 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{
try
{
HttpContent content = HttpSender.this.content;
if (content == null)
return;
content.succeeded();
process();
}
// Catch-all for runtime exceptions
catch (Exception x)
catch (Throwable x)
{
anyToFailure(x);
}
@ -632,6 +673,9 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
@Override
public void failed(Throwable failure)
{
HttpContent content = HttpSender.this.content;
if (content == null)
return;
content.failed(failure);
anyToFailure(failure);
}
@ -647,6 +691,8 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return;
HttpContent content = HttpSender.this.content;
if (content == null)
return;
if (!content.hasContent())
{
@ -709,9 +755,14 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
updateSenderState(current, SenderState.SENDING);
break;
}
case FAILED:
{
return;
}
default:
{
throw illegalSenderState(current);
illegalSenderState(current);
return;
}
}
}
@ -729,6 +780,9 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return Action.IDLE;
HttpContent content = HttpSender.this.content;
if (content == null)
return Action.IDLE;
while (true)
{
boolean advanced = content.advance();
@ -748,7 +802,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return Action.IDLE;
}
SenderState current = HttpSender.this.senderState.get();
SenderState current = senderState.get();
switch (current)
{
case SENDING:
@ -768,7 +822,8 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
}
default:
{
throw illegalSenderState(current);
illegalSenderState(current);
return Action.IDLE;
}
}
}
@ -777,15 +832,24 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
@Override
public void succeeded()
{
ByteBuffer buffer = content.getContent();
someToContent(getHttpExchange().getRequest(), buffer);
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
HttpContent content = HttpSender.this.content;
if (content == null)
return;
content.succeeded();
ByteBuffer buffer = content.getContent();
someToContent(exchange.getRequest(), buffer);
super.succeeded();
}
@Override
public void onCompleteFailure(Throwable failure)
{
HttpContent content = HttpSender.this.content;
if (content == null)
return;
content.failed(failure);
anyToFailure(failure);
}
@ -803,16 +867,22 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
@Override
public void succeeded()
{
content.succeeded();
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
HttpContent content = HttpSender.this.content;
if (content == null)
return;
content.succeeded();
someToSuccess(exchange);
}
@Override
public void failed(Throwable failure)
{
HttpContent content = HttpSender.this.content;
if (content == null)
return;
content.failed(failure);
anyToFailure(failure);
}

View File

@ -0,0 +1,45 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
/**
* <p>Implementations of this interface expose a lock object
* via {@link #getLock()} so that callers can synchronize
* externally on that lock:</p>
* <pre>
* if (iterator instanceof Synchronizable)
* {
* Object element = null;
* synchronized (((Synchronizable)iterator).getLock())
* {
* if (iterator.hasNext())
* element = iterator.next();
* }
* }
* </pre>
* <p>In the example above, the calls to {@code hasNext()} and
* {@code next()} are performed "atomically".</p>
*/
public interface Synchronizable
{
/**
* @return the lock object to synchronize on
*/
public Object getLock();
}

View File

@ -27,11 +27,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.AsyncContentProvider;
import org.eclipse.jetty.client.Synchronizable;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
@ -90,7 +90,7 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback,
private static final Chunk CLOSE = new Chunk(BufferUtil.EMPTY_BUFFER, Callback.Adapter.INSTANCE);
private final Object lock = this;
private final Queue<Chunk> chunks = new ArrayQueue<>(4, 64, lock);
private final ArrayQueue<Chunk> chunks = new ArrayQueue<>(4, 64, lock);
private final AtomicReference<Listener> listener = new AtomicReference<>();
private final DeferredContentProviderIterator iterator = new DeferredContentProviderIterator();
private final AtomicBoolean closed = new AtomicBoolean();
@ -241,7 +241,7 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback,
return iterator;
}
private class DeferredContentProviderIterator implements Iterator<ByteBuffer>, Callback
private class DeferredContentProviderIterator implements Iterator<ByteBuffer>, Callback, Synchronizable
{
private Chunk current;
@ -261,7 +261,12 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback,
{
Chunk chunk = current = chunks.poll();
if (chunk == CLOSE)
{
// Slow path: reinsert the CLOSE chunk
// so that hasNext() works correctly.
chunks.add(0, CLOSE);
throw new NoSuchElementException();
}
return chunk == null ? null : chunk.buffer;
}
}
@ -308,6 +313,12 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback,
for (Chunk chunk : chunks)
chunk.callback.failed(x);
}
@Override
public Object getLock()
{
return lock;
}
}
public static class Chunk

View File

@ -0,0 +1,156 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.util;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.util.Callback;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class DeferredContentProviderTest
{
private ExecutorService executor;
@Before
public void prepare() throws Exception
{
executor = Executors.newCachedThreadPool();
}
@After
public void dispose() throws Exception
{
executor.shutdownNow();
}
@Test
public void testWhenEmptyFlushDoesNotBlock() throws Exception
{
final DeferredContentProvider provider = new DeferredContentProvider();
Future<?> task = executor.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
provider.flush();
return null;
}
});
Assert.assertTrue(await(task, 5, TimeUnit.SECONDS));
}
@Test
public void testOfferFlushBlocksUntilSucceeded() throws Exception
{
final DeferredContentProvider provider = new DeferredContentProvider();
Iterator<ByteBuffer> iterator = provider.iterator();
provider.offer(ByteBuffer.allocate(0));
Future<?> task = executor.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
provider.flush();
return null;
}
});
// Wait until flush() blocks.
Assert.assertFalse(await(task, 1, TimeUnit.SECONDS));
// Consume the content and succeed the callback.
iterator.next();
((Callback)iterator).succeeded();
// Flush should return.
Assert.assertTrue(await(task, 5, TimeUnit.SECONDS));
}
@Test
public void testCloseFlushDoesNotBlock() throws Exception
{
final DeferredContentProvider provider = new DeferredContentProvider();
provider.close();
Future<?> task = executor.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
provider.flush();
return null;
}
});
// Wait until flush() blocks.
Assert.assertTrue(await(task, 5, TimeUnit.SECONDS));
}
@Test
public void testCloseNextHasNextReturnsFalse() throws Exception
{
DeferredContentProvider provider = new DeferredContentProvider();
Iterator<ByteBuffer> iterator = provider.iterator();
provider.close();
Assert.assertFalse(iterator.hasNext());
try
{
iterator.next();
Assert.fail();
}
catch (NoSuchElementException x)
{
// Expected
}
Assert.assertFalse(iterator.hasNext());
}
private boolean await(Future<?> task, long time, TimeUnit unit) throws Exception
{
try
{
task.get(time, unit);
return true;
}
catch (TimeoutException x)
{
return false;
}
}
}

View File

@ -19,15 +19,11 @@
package org.eclipse.jetty.proxy;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
@ -38,8 +34,6 @@ import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.proxy.AbstractProxyServlet;
import org.eclipse.jetty.proxy.AsyncProxyServlet;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
@ -50,36 +44,37 @@ import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
public class Async502Loop
@RunWith(Parameterized.class)
public class AsyncProxyServletLoadTest
{
private static final Logger LOG = Log.getLogger(Async502Loop.class);
private static final String PROXIED_HEADER = "X-Proxied";
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> data()
{
return Arrays.asList(new Object[][]{
{AsyncProxyServlet.class},
{AsyncMiddleManServlet.class}
});
}
public static void main(String[] args)
{
try
{
new Async502Loop().loop();
}
catch (Throwable t)
{
t.printStackTrace(System.err);
}
}
private static final Logger LOG = Log.getLogger(AsyncProxyServletLoadTest.class);
private static final String PROXIED_HEADER = "X-Proxied";
private HttpClient client;
private Server proxy;
private ServerConnector proxyConnector;
private ServletContextHandler proxyContext;
private AbstractProxyServlet proxyServlet;
private Server server;
private ServerConnector serverConnector;
public Async502Loop()
public AsyncProxyServletLoadTest(Class<?> proxyServletClass) throws Exception
{
proxyServlet = new AsyncProxyServlet();
// proxyServlet = new AsyncMiddleManServlet();
proxyServlet = (AbstractProxyServlet)proxyServletClass.newInstance();
}
private void startServer(HttpServlet servlet) throws Exception
@ -90,19 +85,14 @@ public class Async502Loop
serverConnector = new ServerConnector(server);
server.addConnector(serverConnector);
ServletContextHandler appCtx = new ServletContextHandler(server,"/",true,false);
ServletContextHandler appCtx = new ServletContextHandler(server, "/", true, false);
ServletHolder appServletHolder = new ServletHolder(servlet);
appCtx.addServlet(appServletHolder,"/*");
appCtx.addServlet(appServletHolder, "/*");
server.start();
}
private void startProxy() throws Exception
{
startProxy(new HashMap<String, String>());
}
private void startProxy(Map<String, String> initParams) throws Exception
{
QueuedThreadPool proxyPool = new QueuedThreadPool();
proxyPool.setName("proxy");
@ -111,36 +101,28 @@ public class Async502Loop
HttpConfiguration configuration = new HttpConfiguration();
configuration.setSendDateHeader(false);
configuration.setSendServerVersion(false);
String value = initParams.get("outputBufferSize");
if (value != null)
configuration.setOutputBufferSize(Integer.valueOf(value));
proxyConnector = new ServerConnector(proxy,new HttpConnectionFactory(configuration));
proxyConnector = new ServerConnector(proxy, new HttpConnectionFactory(configuration));
proxy.addConnector(proxyConnector);
proxyContext = new ServletContextHandler(proxy,"/",true,false);
ServletContextHandler proxyContext = new ServletContextHandler(proxy, "/", true, false);
ServletHolder proxyServletHolder = new ServletHolder(proxyServlet);
proxyServletHolder.setInitParameters(initParams);
proxyContext.addServlet(proxyServletHolder,"/*");
proxyContext.addServlet(proxyServletHolder, "/*");
proxy.start();
}
private void startClient() throws Exception
{
client = prepareClient();
}
private HttpClient prepareClient() throws Exception
{
QueuedThreadPool clientPool = new QueuedThreadPool();
clientPool.setName("client");
HttpClient result = new HttpClient();
result.setExecutor(clientPool);
result.getProxyConfiguration().getProxies().add(new HttpProxy("localhost",proxyConnector.getLocalPort()));
result.getProxyConfiguration().getProxies().add(new HttpProxy("localhost", proxyConnector.getLocalPort()));
result.start();
return result;
client = result;
}
@After
public void dispose() throws Exception
{
client.stop();
@ -148,60 +130,8 @@ public class Async502Loop
server.stop();
}
private static class ClientLoop implements Runnable
{
private final CountDownLatch active;
private final AtomicBoolean ok;
private final HttpClient client;
private final String host;
private final int port;
public ClientLoop(CountDownLatch activeClientLatch, AtomicBoolean atomicOk, HttpClient client, String serverHost, int serverPort)
{
this.active = activeClientLatch;
this.ok = atomicOk;
this.client = client;
this.host = serverHost;
this.port = serverPort;
}
@Override
public void run()
{
String threadName = Thread.currentThread().getName();
LOG.info("Starting thread {}",threadName);
try
{
while (ok.get())
{
byte[] content = new byte[1024];
new Random().nextBytes(content);
ContentResponse response = client.newRequest(host,port).method(HttpMethod.POST).content(new BytesContentProvider(content))
.timeout(5,TimeUnit.SECONDS).send();
if (response.getStatus() != 200)
{
LOG.warn("Got response <{}>, expecting <{}>",response.getStatus(),200);
// allow all ClientLoops to finish
ok.set(false);
}
}
}
catch (InterruptedException | TimeoutException | ExecutionException e)
{
LOG.warn("Error processing request",e);
ok.set(false);
}
finally
{
LOG.info("Shutting down thread {}",threadName);
active.countDown();
}
}
}
@SuppressWarnings("serial")
private void loop() throws Exception
@Test
public void test() throws Exception
{
startServer(new HttpServlet()
{
@ -209,33 +139,98 @@ public class Async502Loop
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
{
if (req.getHeader("Via") != null)
resp.addHeader(PROXIED_HEADER,"true");
IO.copy(req.getInputStream(),resp.getOutputStream());
resp.addHeader(PROXIED_HEADER, "true");
IO.copy(req.getInputStream(), resp.getOutputStream());
}
});
startProxy();
startClient();
// Number of clients to simulate
int clientCount = 5;
int clientCount = Runtime.getRuntime().availableProcessors();
// Latch for number of clients still active (used to terminate test)
final CountDownLatch activeClientLatch = new CountDownLatch(clientCount);
// Atomic Boolean to track that its OK to still continue looping.
// When this goes false, that means one of the client threads has
// encountered an error condition, and should allow all remaining
// client threads to finish cleanly.
final AtomicBoolean atomicOk = new AtomicBoolean(true);
final AtomicBoolean success = new AtomicBoolean(true);
int iterations = 1000;
// Start clients
for (int i = 0; i < clientCount; i++)
{
ClientLoop r = new ClientLoop(activeClientLatch,atomicOk,client,"localhost",serverConnector.getLocalPort());
ClientLoop r = new ClientLoop(activeClientLatch, success, client, "localhost", serverConnector.getLocalPort(), iterations);
String name = "client-" + i;
Thread thread = new Thread(r,name);
Thread thread = new Thread(r, name);
thread.start();
}
activeClientLatch.await();
dispose();
Assert.assertTrue(activeClientLatch.await(clientCount * iterations * 10, TimeUnit.MILLISECONDS));
Assert.assertTrue(success.get());
}
private static class ClientLoop implements Runnable
{
private final CountDownLatch active;
private final AtomicBoolean success;
private final HttpClient client;
private final String host;
private final int port;
private int iterations;
public ClientLoop(CountDownLatch activeClientLatch, AtomicBoolean success, HttpClient client, String serverHost, int serverPort, int iterations)
{
this.active = activeClientLatch;
this.success = success;
this.client = client;
this.host = serverHost;
this.port = serverPort;
this.iterations = iterations;
}
@Override
public void run()
{
String threadName = Thread.currentThread().getName();
LOG.info("Starting thread {}", threadName);
try
{
while (success.get())
{
--iterations;
byte[] content = new byte[1024];
new Random().nextBytes(content);
ContentResponse response = client.newRequest(host, port).method(HttpMethod.POST).content(new BytesContentProvider(content))
.timeout(5, TimeUnit.SECONDS).send();
if (response.getStatus() != 200)
{
LOG.warn("Got response <{}>, expecting <{}>", response.getStatus(), 200);
// allow all ClientLoops to finish
success.set(false);
}
else
{
if (iterations == 0)
break;
}
}
}
catch (Throwable x)
{
LOG.warn("Error processing request", x);
success.set(false);
}
finally
{
LOG.info("Shutting down thread {}", threadName);
active.countDown();
}
}
}
}