This commit is contained in:
neha-ellur 2024-12-17 18:03:23 -08:00
parent 7c1e15b635
commit 4991db104a
1 changed files with 23 additions and 0 deletions

View File

@ -86,8 +86,10 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.counters.QueryCounterSnapshot;
import org.apache.druid.msq.indexing.InputChannelFactory;
import org.apache.druid.msq.indexing.InputChannelsImpl;
import org.apache.druid.msq.indexing.MSQControllerTask;
@ -329,6 +331,27 @@ public class ControllerImpl implements Controller
}
// Call onQueryComplete after Closer is fully closed, ensuring no controller-related processing is ongoing.
queryListener.onQueryComplete(reportPayload);
long totalProcessedBytes = reportPayload.getCounters().copyMap().values().stream()
.mapToLong(integerCounterSnapshotsMap -> integerCounterSnapshotsMap.values().stream()
.mapToLong(counterSnapshots -> {
Map<String, QueryCounterSnapshot> workerCounters = counterSnapshots.getMap();
return workerCounters.entrySet().stream()
.mapToLong(channel -> {
if (channel.getKey().startsWith("input")) {
ChannelCounters.Snapshot snapshot = (ChannelCounters.Snapshot) channel.getValue();
return snapshot.getBytes() == null ? 0L :
Arrays.stream(snapshot.getBytes()).sum();
}
return 0L;
})
.sum();
})
.sum())
.sum();
log.info("Total processed bytes: %d", totalProcessedBytes);
context.emitMetric("ingest/processed/bytes", totalProcessedBytes);
}
@Override