From 820febf38c062d35876ddeec4c6887bd0ec51d03 Mon Sep 17 00:00:00 2001 From: Sree Charan Manamala <155449160+sreemanamala@users.noreply.github.com> Date: Mon, 4 Mar 2024 15:02:32 +0530 Subject: [PATCH] Improved Connection Count server select strategy (#15975) Updated the Direct Druid Client so as to make Connection Count Server Selector Strategy work more efficiently. If creating connection to a node is slow, then that slowness wouldn't be accounted for if we count the open connections after sending the request. So we increment the counter and then send the request. --- .../druid/client/DirectDruidClient.java | 35 ++++++++------ .../druid/client/DirectDruidClientTest.java | 48 +++++++++++++++++++ 2 files changed, 69 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index 9a93c8bacd0..6a502110058 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -455,22 +455,29 @@ public class DirectDruidClient implements QueryRunner throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s] url[%s] timed out.", query.getId(), url)); } - future = httpClient.go( - new Request( - HttpMethod.POST, - new URL(url) - ).setContent(objectMapper.writeValueAsBytes(Queries.withTimeout(query, timeLeft))) - .setHeader( - HttpHeaders.Names.CONTENT_TYPE, - isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON - ), - responseHandler, - Duration.millis(timeLeft) - ); + // increment is moved up so that if future initialization is queued by some other process, + // we can increment the count earlier so that we can route the request to a different server + openConnections.getAndIncrement(); + try { + future = httpClient.go( + new Request( + HttpMethod.POST, + new URL(url) + ).setContent(objectMapper.writeValueAsBytes(Queries.withTimeout(query, timeLeft))) + .setHeader( + HttpHeaders.Names.CONTENT_TYPE, + isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON + ), + responseHandler, + Duration.millis(timeLeft) + ); + } + catch (Exception e) { + openConnections.getAndDecrement(); + throw e; + } queryWatcher.registerQueryFuture(query, future); - - openConnections.getAndIncrement(); Futures.addCallback( future, new FutureCallback() diff --git a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java index 4c99d2b5409..4fffcd6fd35 100644 --- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java @@ -19,6 +19,8 @@ package org.apache.druid.client; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -27,6 +29,7 @@ import org.apache.druid.client.selector.ConnectionCountServerSelectorStrategy; import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy; import org.apache.druid.client.selector.QueryableDruidServer; import org.apache.druid.client.selector.ServerSelector; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; @@ -38,6 +41,7 @@ import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.HttpResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.query.Druids; +import org.apache.druid.query.Query; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunnerTestHelper; @@ -97,6 +101,7 @@ public class DirectDruidClientTest @Before public void setup() { + NullHandling.initializeForTests(); httpClient = EasyMock.createMock(HttpClient.class); serverSelector = new ServerSelector( dataSegment, @@ -427,4 +432,47 @@ public class DirectDruidClientTest Assert.assertEquals(hostName, actualException.getHost()); EasyMock.verify(httpClient); } + + @Test + public void testConnectionCountAfterException() throws JsonProcessingException + { + ObjectMapper mockObjectMapper = EasyMock.createMock(ObjectMapper.class); + EasyMock.expect(mockObjectMapper.writeValueAsBytes(Query.class)) + .andThrow(new JsonProcessingException("Error") + { + }); + + DirectDruidClient client2 = new DirectDruidClient( + new ReflectionQueryToolChestWarehouse(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER, + mockObjectMapper, + httpClient, + "http", + hostName, + new NoopServiceEmitter(), + queryCancellationExecutor + ); + + QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer( + new DruidServer( + "test1", + "localhost", + null, + 0, + ServerType.HISTORICAL, + DruidServer.DEFAULT_TIER, + 0 + ), + client2 + ); + + serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment()); + + TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); + query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); + + TimeBoundaryQuery finalQuery = query; + Assert.assertThrows(RuntimeException.class, () -> client2.run(QueryPlus.wrap(finalQuery))); + Assert.assertEquals(0, client2.getNumOpenConnections()); + } }