Report zero values instead of unknown for empty ingest queries (#15674)

MSQ now allows empty ingest queries by default. For such queries that don't generate any output rows, the query counters in the async status result object/task report don't contain numTotalRows and totalSizeInBytes. These properties when not set/undefined can be confusing to API clients. For example, the web-console treats it as unknown values.

This patch fixes the counters by explicitly reporting them as 0 instead of null for empty ingest queries.
This commit is contained in:
Abhishek Radhakrishnan 2024-01-17 02:56:10 -08:00 committed by GitHub
parent 8a43db9395
commit c27f5bf52f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 230 additions and 63 deletions

View File

@ -172,31 +172,22 @@ public class SqlStatementResourceHelper
for (CounterSnapshots counterSnapshots : workerCounters.values()) {
QueryCounterSnapshot queryCounterSnapshot = counterSnapshots.getMap()
.getOrDefault("segmentGenerationProgress", null);
if (queryCounterSnapshot != null && queryCounterSnapshot instanceof SegmentGenerationProgressCounter.Snapshot) {
if (queryCounterSnapshot instanceof SegmentGenerationProgressCounter.Snapshot) {
rows += ((SegmentGenerationProgressCounter.Snapshot) queryCounterSnapshot).getRowsPushed();
}
}
if (rows != 0L) {
return Optional.of(ImmutableList.of(new PageInformation(0, rows, null)));
} else {
return Optional.empty();
}
return Optional.of(ImmutableList.of(new PageInformation(0, rows, null)));
} else if (msqDestination instanceof TaskReportMSQDestination) {
long rows = 0L;
long size = 0L;
for (CounterSnapshots counterSnapshots : workerCounters.values()) {
QueryCounterSnapshot queryCounterSnapshot = counterSnapshots.getMap().getOrDefault("output", null);
if (queryCounterSnapshot != null && queryCounterSnapshot instanceof ChannelCounters.Snapshot) {
if (queryCounterSnapshot instanceof ChannelCounters.Snapshot) {
rows += Arrays.stream(((ChannelCounters.Snapshot) queryCounterSnapshot).getRows()).sum();
size += Arrays.stream(((ChannelCounters.Snapshot) queryCounterSnapshot).getBytes()).sum();
}
}
if (rows != 0L) {
return Optional.of(ImmutableList.of(new PageInformation(0, rows, size)));
} else {
return Optional.empty();
}
return Optional.of(ImmutableList.of(new PageInformation(0, rows, size)));
} else if (msqDestination instanceof DurableStorageMSQDestination) {
return populatePagesForDurableStorageDestination(finalStage, workerCounters);
@ -221,7 +212,6 @@ public class SqlStatementResourceHelper
throw DruidException.defensive("Expected worker count to be set for stage[%d]", finalStage);
}
List<PageInformation> pages = new ArrayList<>();
for (int partitionNumber = 0; partitionNumber < totalPartitions; partitionNumber++) {
for (int workerNumber = 0; workerNumber < totalWorkerCount; workerNumber++) {
@ -230,7 +220,7 @@ public class SqlStatementResourceHelper
if (workerCounter != null && workerCounter.getMap() != null) {
QueryCounterSnapshot channelCounters = workerCounter.getMap().get("output");
if (channelCounters != null && channelCounters instanceof ChannelCounters.Snapshot) {
if (channelCounters instanceof ChannelCounters.Snapshot) {
long rows = 0L;
long size = 0L;

View File

@ -179,6 +179,65 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase
);
}
@Test
public void emptyInsert()
{
Response response = resource.doPost(new SqlQuery(
"insert into foo1 select __time, dim1 , count(*) as cnt from foo where dim1 is not null and __time < TIMESTAMP '1971-01-01 00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1",
null,
false,
false,
false,
ImmutableMap.<String, Object>builder()
.putAll(defaultAsyncContext())
.build(),
null
), SqlStatementResourceTest.makeOkRequest());
Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
SqlStatementResult actual = (SqlStatementResult) response.getEntity();
SqlStatementResult expected = new SqlStatementResult(
actual.getQueryId(),
SqlStatementState.SUCCESS,
MSQTestOverlordServiceClient.CREATED_TIME,
null,
MSQTestOverlordServiceClient.DURATION,
new ResultSetInformation(0L, 0L, null, "foo1", null, null),
null
);
Assert.assertEquals(expected, actual);
}
@Test
public void emptyReplace()
{
Response response = resource.doPost(new SqlQuery(
"replace into foo1 overwrite all select __time, dim1 , count(*) as cnt from foo where dim1 is not null and __time < TIMESTAMP '1971-01-01 00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1",
null,
false,
false,
false,
ImmutableMap.<String, Object>builder()
.putAll(defaultAsyncContext())
.build(),
null
), SqlStatementResourceTest.makeOkRequest());
Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
SqlStatementResult actual = (SqlStatementResult) response.getEntity();
SqlStatementResult expected = new SqlStatementResult(
actual.getQueryId(),
SqlStatementState.SUCCESS,
MSQTestOverlordServiceClient.CREATED_TIME,
null,
MSQTestOverlordServiceClient.DURATION,
new ResultSetInformation(0L, 0L, null, "foo1", null, null),
null
);
Assert.assertEquals(expected, actual);
}
@Test
public void insertCannotBeEmptyFaultTest()
@ -433,7 +492,6 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase
rows.add(ImmutableList.of(1466985600000L, "GiftBot"));
rows.add(ImmutableList.of(1466985600000L, "GiftBot"));
Assert.assertEquals(rows, SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
sqlStatementResult.getQueryId(),
null,

View File

@ -1404,8 +1404,8 @@ public class MSQTestBase extends BaseCalciteQueryTest
);
rows.addAll(new FrameChannelSequence(inputChannelFactory.openChannel(
finalStage.getId(),
pageInformation.getWorker(),
pageInformation.getPartition()
pageInformation.getWorker() == null ? 0 : pageInformation.getWorker(),
pageInformation.getPartition() == null ? 0 : pageInformation.getPartition()
)).flatMap(frame -> SqlStatementResourceHelper.getResultSequence(
msqControllerTask,
finalStage,

View File

@ -23,11 +23,13 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.frame.Frame;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
import org.apache.druid.msq.indexing.report.MSQStagesReport;
import org.apache.druid.msq.indexing.report.MSQStatusReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
@ -38,6 +40,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -46,9 +49,6 @@ import java.util.TreeMap;
public class SqlStatementResourceHelperTest
{
private static final Logger log = new Logger(SqlStatementResourceHelperTest.class);
@Test
public void testDistinctPartitionsOnEachWorker()
{
@ -83,7 +83,7 @@ public class SqlStatementResourceHelperTest
payload,
DurableStorageMSQDestination.instance()
);
validatePages(pages.get(), createValidationMap(worker0, worker1, worker2));
validatePages(pages.get(), getExpectedPageInformationList(worker0, worker1, worker2));
}
@Test
@ -122,7 +122,7 @@ public class SqlStatementResourceHelperTest
payload,
DurableStorageMSQDestination.instance()
);
validatePages(pages.get(), createValidationMap(worker0, worker1, worker2));
validatePages(pages.get(), getExpectedPageInformationList(worker0, worker1, worker2));
}
@ -160,7 +160,7 @@ public class SqlStatementResourceHelperTest
Optional<List<PageInformation>> pages =
SqlStatementResourceHelper.populatePageList(payload, DurableStorageMSQDestination.instance());
validatePages(pages.get(), createValidationMap(worker0, worker1, worker2, worker3));
validatePages(pages.get(), getExpectedPageInformationList(worker0, worker1, worker2, worker3));
}
@ -200,10 +200,9 @@ public class SqlStatementResourceHelperTest
payload,
DurableStorageMSQDestination.instance()
);
validatePages(pages.get(), createValidationMap(worker0, worker1, worker2, worker3));
validatePages(pages.get(), getExpectedPageInformationList(worker0, worker1, worker2, worker3));
}
@Test
public void testConsecutivePartitionsOnEachWorker()
{
@ -240,41 +239,148 @@ public class SqlStatementResourceHelperTest
payload,
DurableStorageMSQDestination.instance()
);
validatePages(pages.get(), createValidationMap(worker0, worker1, worker2, worker3));
validatePages(pages.get(), getExpectedPageInformationList(worker0, worker1, worker2, worker3));
}
private void validatePages(
List<PageInformation> pageList,
Map<Integer, Map<Integer, Pair<Long, Long>>> partitionToWorkerToRowsBytes
)
/**
* Durable storage destination applies only to SELECT queries and unlike ingest queries, emtpy worker counters will not
* be reported in this case. See {@link #testEmptyCountersForTaskReportDestination()} and {@link #testEmptyCountersForDataSourceDestination()}
* to see the difference.
*/
@Test
public void testEmptyCountersForDurableStorageDestination()
{
int currentPage = 0;
for (Map.Entry<Integer, Map<Integer, Pair<Long, Long>>> partitionWorker : partitionToWorkerToRowsBytes.entrySet()) {
for (Map.Entry<Integer, Pair<Long, Long>> workerRowsBytes : partitionWorker.getValue().entrySet()) {
PageInformation pageInformation = pageList.get(currentPage);
Assert.assertEquals(currentPage, pageInformation.getId());
Assert.assertEquals(workerRowsBytes.getValue().lhs, pageInformation.getNumRows());
Assert.assertEquals(workerRowsBytes.getValue().rhs, pageInformation.getSizeInBytes());
Assert.assertEquals(partitionWorker.getKey(), pageInformation.getPartition());
Assert.assertEquals(workerRowsBytes.getKey(), pageInformation.getWorker());
log.debug(pageInformation.toString());
currentPage++;
}
}
Assert.assertEquals(currentPage, pageList.size());
CounterSnapshotsTree counterSnapshots = new CounterSnapshotsTree();
ChannelCounters worker0 = createChannelCounters(new int[0]);
counterSnapshots.put(0, 0, new CounterSnapshots(ImmutableMap.of()));
MSQTaskReportPayload payload = new MSQTaskReportPayload(
new MSQStatusReport(
TaskState.SUCCESS,
null,
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
),
MSQStagesReport.create(
MSQTaskReportTest.QUERY_DEFINITION,
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(0, 1),
ImmutableMap.of(0, 1)
),
counterSnapshots,
null
);
Optional<List<PageInformation>> pages = SqlStatementResourceHelper.populatePageList(
payload,
DurableStorageMSQDestination.instance()
);
validatePages(pages.get(), getExpectedPageInformationList(worker0));
}
private Map<Integer, Map<Integer, Pair<Long, Long>>> createValidationMap(
ChannelCounters... workers
)
@Test
public void testEmptyCountersForTaskReportDestination()
{
if (workers == null || workers.length == 0) {
return new HashMap<>();
CounterSnapshotsTree counterSnapshots = new CounterSnapshotsTree();
counterSnapshots.put(0, 0, new CounterSnapshots(ImmutableMap.of()));
MSQTaskReportPayload payload = new MSQTaskReportPayload(
new MSQStatusReport(
TaskState.SUCCESS,
null,
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
),
MSQStagesReport.create(
MSQTaskReportTest.QUERY_DEFINITION,
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(0, 1),
ImmutableMap.of(0, 1)
),
counterSnapshots,
null
);
Optional<List<PageInformation>> pages = SqlStatementResourceHelper.populatePageList(
payload,
TaskReportMSQDestination.instance()
);
Assert.assertTrue(pages.isPresent());
Assert.assertEquals(1, pages.get().size());
Assert.assertEquals(new PageInformation(0, 0L, 0L), pages.get().get(0));
}
@Test
public void testEmptyCountersForDataSourceDestination()
{
CounterSnapshotsTree counterSnapshots = new CounterSnapshotsTree();
counterSnapshots.put(0, 0, new CounterSnapshots(ImmutableMap.of()));
MSQTaskReportPayload payload = new MSQTaskReportPayload(
new MSQStatusReport(
TaskState.SUCCESS,
null,
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
),
MSQStagesReport.create(
MSQTaskReportTest.QUERY_DEFINITION,
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(0, 1),
ImmutableMap.of(0, 1)
),
counterSnapshots,
null
);
Optional<List<PageInformation>> pages = SqlStatementResourceHelper.populatePageList(
payload,
new DataSourceMSQDestination(
"test",
Granularities.DAY,
null,
null
)
);
Assert.assertTrue(pages.isPresent());
Assert.assertEquals(1, pages.get().size());
Assert.assertEquals(new PageInformation(0, 0L, null), pages.get().get(0));
}
private void validatePages(List<PageInformation> actualPageList, List<PageInformation> expectedPageList)
{
Assert.assertEquals(expectedPageList.size(), actualPageList.size());
Assert.assertEquals(expectedPageList, actualPageList);
}
private List<PageInformation> getExpectedPageInformationList(ChannelCounters... workerCounters)
{
List<PageInformation> pageInformationList = new ArrayList<>();
if (workerCounters == null || workerCounters.length == 0) {
return pageInformationList;
} else {
Map<Integer, Map<Integer, Pair<Long, Long>>> partitionToWorkerToRowsBytes = new TreeMap<>();
for (int worker = 0; worker < workers.length; worker++) {
ChannelCounters.Snapshot workerCounter = workers[worker].snapshot();
for (int worker = 0; worker < workerCounters.length; worker++) {
ChannelCounters.Snapshot workerCounter = workerCounters[worker].snapshot();
for (int partition = 0; workerCounter != null && partition < workerCounter.getRows().length; partition++) {
Map<Integer, Pair<Long, Long>> workerMap = partitionToWorkerToRowsBytes.computeIfAbsent(
partition,
@ -290,14 +396,27 @@ public class SqlStatementResourceHelperTest
)
);
}
}
}
return partitionToWorkerToRowsBytes;
// Construct the pages based on the order of partitionToWorkerMap.
for (Map.Entry<Integer, Map<Integer, Pair<Long, Long>>> partitionToWorkerMap : partitionToWorkerToRowsBytes.entrySet()) {
for (Map.Entry<Integer, Pair<Long, Long>> workerToRowsBytesMap : partitionToWorkerMap.getValue().entrySet()) {
pageInformationList.add(
new PageInformation(
pageInformationList.size(),
workerToRowsBytesMap.getValue().lhs,
workerToRowsBytesMap.getValue().rhs,
workerToRowsBytesMap.getKey(),
partitionToWorkerMap.getKey()
)
);
}
}
return pageInformationList;
}
}
private ChannelCounters createChannelCounters(int[] partitions)
{
if (partitions == null || partitions.length == 0) {

View File

@ -76,12 +76,12 @@ public class QueryHostFinder
Server chosenServer = avaticaConnectionBalancer.pickServer(getAllServers(), connectionId);
assertServerFound(
chosenServer,
"No server found for Avatica request with connectionId [%s]",
"No server found for Avatica request with connectionId[%s]",
connectionId
);
log.debug(
"Balancer class [%s] sending request with connectionId [%s] to server: %s",
"Balancer class[%s] sending request with connectionId[%s] to server[%s]",
avaticaConnectionBalancer.getClass(),
connectionId,
chosenServer.getHost()
@ -120,7 +120,7 @@ public class QueryHostFinder
Server server = findDefaultServer();
assertServerFound(
server,
"There are no available brokers. Please check that your brokers are running and " + " healthy."
"There are no available brokers. Please check that your brokers are running and healthy."
);
return server;
}
@ -136,7 +136,7 @@ public class QueryHostFinder
if (server == null) {
log.error(
"No server found for serviceName [%s]. Using backup",
"No server found for serviceName[%s]. Using backup",
serviceName
);
@ -144,7 +144,7 @@ public class QueryHostFinder
if (server == null) {
log.error(
"No backup found for serviceName [%s]. Using default [%s]",
"No backup found for serviceName[%s]. Using default[%s]",
serviceName,
hostSelector.getDefaultServiceName()
);
@ -162,7 +162,7 @@ public class QueryHostFinder
private void assertServerFound(Server server, String messageFormat, Object... args)
{
if (server != null) {
log.debug("Selected [%s]", server.getHost());
log.debug("Selected server[%s]", server.getHost());
return;
}