Improved Connection Count server select strategy (#15975)

Updated the Direct Druid Client so as to make Connection Count Server Selector Strategy work more efficiently.
If creating connection to a node is slow, then that slowness wouldn't be accounted for if we count the open connections after sending the request. So we increment the counter and then send the request.
This commit is contained in:
Sree Charan Manamala 2024-03-04 15:02:32 +05:30 committed by GitHub
parent b3015bd7ce
commit 820febf38c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 69 additions and 14 deletions

View File

@ -455,22 +455,29 @@ public class DirectDruidClient<T> implements QueryRunner<T>
throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s] url[%s] timed out.", query.getId(), url));
}
future = httpClient.go(
new Request(
HttpMethod.POST,
new URL(url)
).setContent(objectMapper.writeValueAsBytes(Queries.withTimeout(query, timeLeft)))
.setHeader(
HttpHeaders.Names.CONTENT_TYPE,
isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON
),
responseHandler,
Duration.millis(timeLeft)
);
// increment is moved up so that if future initialization is queued by some other process,
// we can increment the count earlier so that we can route the request to a different server
openConnections.getAndIncrement();
try {
future = httpClient.go(
new Request(
HttpMethod.POST,
new URL(url)
).setContent(objectMapper.writeValueAsBytes(Queries.withTimeout(query, timeLeft)))
.setHeader(
HttpHeaders.Names.CONTENT_TYPE,
isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON
),
responseHandler,
Duration.millis(timeLeft)
);
}
catch (Exception e) {
openConnections.getAndDecrement();
throw e;
}
queryWatcher.registerQueryFuture(query, future);
openConnections.getAndIncrement();
Futures.addCallback(
future,
new FutureCallback<InputStream>()

View File

@ -19,6 +19,8 @@
package org.apache.druid.client;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -27,6 +29,7 @@ import org.apache.druid.client.selector.ConnectionCountServerSelectorStrategy;
import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
import org.apache.druid.client.selector.QueryableDruidServer;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
@ -38,6 +41,7 @@ import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunnerTestHelper;
@ -97,6 +101,7 @@ public class DirectDruidClientTest
@Before
public void setup()
{
NullHandling.initializeForTests();
httpClient = EasyMock.createMock(HttpClient.class);
serverSelector = new ServerSelector(
dataSegment,
@ -427,4 +432,47 @@ public class DirectDruidClientTest
Assert.assertEquals(hostName, actualException.getHost());
EasyMock.verify(httpClient);
}
@Test
public void testConnectionCountAfterException() throws JsonProcessingException
{
ObjectMapper mockObjectMapper = EasyMock.createMock(ObjectMapper.class);
EasyMock.expect(mockObjectMapper.writeValueAsBytes(Query.class))
.andThrow(new JsonProcessingException("Error")
{
});
DirectDruidClient client2 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
mockObjectMapper,
httpClient,
"http",
hostName,
new NoopServiceEmitter(),
queryCancellationExecutor
);
QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(
new DruidServer(
"test1",
"localhost",
null,
0,
ServerType.HISTORICAL,
DruidServer.DEFAULT_TIER,
0
),
client2
);
serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment());
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE));
TimeBoundaryQuery finalQuery = query;
Assert.assertThrows(RuntimeException.class, () -> client2.run(QueryPlus.wrap(finalQuery)));
Assert.assertEquals(0, client2.getNumOpenConnections());
}
}