401651 - Abort request if maxRequestsQueuedPerDestination is reached.

This commit is contained in:
Simone Bordet 2013-02-24 19:52:41 +01:00
parent 2d310ac82e
commit 34d343e260
2 changed files with 61 additions and 2 deletions

View File

@ -175,7 +175,8 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
} }
else else
{ {
throw new RejectedExecutionException("Max requests per destination " + client.getMaxRequestsQueuedPerDestination() + " exceeded"); LOG.debug("Max queued exceeded {}", request);
abort(exchange, new RejectedExecutionException("Max requests per destination " + client.getMaxRequestsQueuedPerDestination() + " exceeded for " + this));
} }
} }
else else
@ -208,7 +209,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
if (next > maxConnections) if (next > maxConnections)
{ {
LOG.debug("Max connections {} reached for {}", current, this); LOG.debug("Max connections per destination {} exceeded for {}", current, this);
// Try again the idle connections // Try again the idle connections
return idleConnections.poll(); return idleConnections.poll();
} }

View File

@ -19,11 +19,17 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.hamcrest.Matchers;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -155,4 +161,56 @@ public class HttpDestinationTest extends AbstractHttpClientServerTest
Assert.assertNull(connection1); Assert.assertNull(connection1);
} }
} }
@Test
public void test_Request_Failed_If_MaxRequestsQueuedPerDestination_Exceeded() throws Exception
{
int maxQueued = 1;
client.setMaxRequestsQueuedPerDestination(maxQueued);
client.setMaxConnectionsPerDestination(1);
// Make one request to open the connection and be sure everything is setup properly
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send();
Assert.assertEquals(200, response.getStatus());
// Send another request that is sent immediately
final CountDownLatch successLatch = new CountDownLatch(1);
final CountDownLatch failureLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.onRequestQueued(new Request.QueuedListener()
{
@Override
public void onQueued(Request request)
{
// This request exceeds the maximum queued, should fail
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
Assert.assertThat(result.getRequestFailure(), Matchers.instanceOf(RejectedExecutionException.class));
failureLatch.countDown();
}
});
}
})
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isSucceeded())
successLatch.countDown();
}
});
Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
}
} }