add connection count test in case of failure

This commit is contained in:
Xavier Léauté 2014-01-10 12:43:17 -08:00
parent d33aba728a
commit bd9a4a0b60
1 changed files with 42 additions and 13 deletions

View File

@ -17,35 +17,41 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.client.selector; package io.druid.client;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request; import com.metamx.http.client.Request;
import com.metamx.http.client.RequestBuilder; import com.metamx.http.client.RequestBuilder;
import io.druid.client.DirectDruidClient; import io.druid.client.selector.ConnectionCountServerSelectorStrategy;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids; import io.druid.query.Druids;
import io.druid.query.ReflectionQueryToolChestWarehouse; import io.druid.query.ReflectionQueryToolChestWarehouse;
import io.druid.query.Result;
import io.druid.query.timeboundary.TimeBoundaryQuery; import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
import junit.framework.Assert; import junit.framework.Assert;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.net.URL; import java.net.URL;
import java.util.List;
/** public class DirectDruidClientTest
*/
public class ServerSelectorTest
{ {
private HttpClient httpClient; private HttpClient httpClient;
@ -56,10 +62,16 @@ public class ServerSelectorTest
} }
@Test @Test
public void testPick() throws Exception public void testRun() throws Exception
{ {
RequestBuilder requestBuilder = new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com")); RequestBuilder requestBuilder = new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com"));
EasyMock.expect(httpClient.post(EasyMock.<URL>anyObject())).andReturn(requestBuilder).atLeastOnce(); EasyMock.expect(httpClient.post(EasyMock.<URL>anyObject())).andReturn(requestBuilder).atLeastOnce();
SettableFuture futureException = SettableFuture.create();
SettableFuture<InputStream> futureResult = SettableFuture.create();
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(futureResult).times(1);
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(futureException).times(1);
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(SettableFuture.create()).atLeastOnce(); EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(SettableFuture.create()).atLeastOnce();
EasyMock.replay(httpClient); EasyMock.replay(httpClient);
@ -80,13 +92,13 @@ public class ServerSelectorTest
DirectDruidClient client1 = new DirectDruidClient( DirectDruidClient client1 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(), new ReflectionQueryToolChestWarehouse(),
new DefaultObjectMapper(new SmileFactory()), new DefaultObjectMapper(),
httpClient, httpClient,
"foo" "foo"
); );
DirectDruidClient client2 = new DirectDruidClient( DirectDruidClient client2 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(), new ReflectionQueryToolChestWarehouse(),
new DefaultObjectMapper(new SmileFactory()), new DefaultObjectMapper(),
httpClient, httpClient,
"foo2" "foo2"
); );
@ -104,11 +116,28 @@ public class ServerSelectorTest
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
client1.run(query); Sequence s1 = client1.run(query);
client1.run(query); Assert.assertEquals(1, client1.getNumOpenConnections());
client1.run(query);
Assert.assertTrue(client1.getNumOpenConnections() == 3); // simulate read timeout
Sequence s2 = client1.run(query);
Assert.assertEquals(2, client1.getNumOpenConnections());
futureException.setException(new ReadTimeoutException());
Assert.assertEquals(1, client1.getNumOpenConnections());
// subsequent connections should work
Sequence s3 = client1.run(query);
Sequence s4 = client1.run(query);
Sequence s5 = client1.run(query);
Assert.assertTrue(client1.getNumOpenConnections() == 4);
// produce result for first connection
futureResult.set(new ByteArrayInputStream("[{\"timestamp\":\"2014-01-01T01:02:03Z\", \"result\": 42.0}]".getBytes()));
List<Result> results = Sequences.toList(s1, Lists.<Result>newArrayList());
Assert.assertEquals(1, results.size());
Assert.assertEquals(new DateTime("2014-01-01T01:02:03Z"), results.get(0).getTimestamp());
Assert.assertEquals(3, client1.getNumOpenConnections());
client2.run(query); client2.run(query);
client2.run(query); client2.run(query);