fix KafkaSupervisor stats report error (#6508)

* fix kafkasupervisor stats 500

* added unit test

* throw error if group already exists
This commit is contained in:
Joshua Sun 2018-10-25 15:45:54 -07:00 committed by Gian Merlino
parent ee1fc93f97
commit f7753ef1e2
2 changed files with 99 additions and 12 deletions

View File

@ -2416,21 +2416,21 @@ public class KafkaSupervisor implements Supervisor
}
for (int groupId : pendingCompletionTaskGroups.keySet()) {
TaskGroup group = taskGroups.get(groupId);
for (String taskId : group.taskIds()) {
futures.add(
Futures.transform(
taskClient.getMovingAveragesAsync(taskId),
(Function<Map<String, Object>, StatsFromTaskResult>) (currentStats) -> {
return new StatsFromTaskResult(
List<TaskGroup> pendingGroups = pendingCompletionTaskGroups.get(groupId);
for (TaskGroup pendingGroup : pendingGroups) {
for (String taskId : pendingGroup.taskIds()) {
futures.add(
Futures.transform(
taskClient.getMovingAveragesAsync(taskId),
(Function<Map<String, Object>, StatsFromTaskResult>) (currentStats) -> new StatsFromTaskResult(
groupId,
taskId,
currentStats
);
}
)
);
groupAndTaskIds.add(new Pair<>(groupId, taskId));
)
)
);
groupAndTaskIds.add(new Pair<>(groupId, taskId));
}
}
}
@ -2449,6 +2449,50 @@ public class KafkaSupervisor implements Supervisor
return allStats;
}
@VisibleForTesting
void addTaskGroupToActivelyReadingTaskGroup(
int taskGroupId,
ImmutableMap<Integer, Long> partitionOffsets,
Optional<DateTime> minMsgTime,
Optional<DateTime> maxMsgTime,
Set<String> tasks
)
{
TaskGroup group = new TaskGroup(
taskGroupId,
partitionOffsets,
minMsgTime,
maxMsgTime
);
group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x -> new TaskData())));
if (taskGroups.putIfAbsent(taskGroupId, group) != null) {
throw new ISE(
"trying to add taskGroup with ID [%s] to actively reading task groups, but group already exists.",
taskGroupId
);
}
}
@VisibleForTesting
void addTaskGroupToPendingCompletionTaskGroup(
int taskGroupId,
ImmutableMap<Integer, Long> partitionOffsets,
Optional<DateTime> minMsgTime,
Optional<DateTime> maxMsgTime,
Set<String> tasks
)
{
TaskGroup group = new TaskGroup(
taskGroupId,
partitionOffsets,
minMsgTime,
maxMsgTime
);
group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x -> new TaskData())));
pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, x -> new CopyOnWriteArrayList<>())
.add(group);
}
@VisibleForTesting
@Nullable
TaskGroup removeTaskGroup(int taskGroupId)

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import kafka.admin.AdminUtils;
@ -2546,6 +2547,48 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(Long.MAX_VALUE, (long) taskConfig.getEndPartitions().getPartitionOffsetMap().get(2));
}
@Test
public void testGetCurrentTotalStats() throws Exception
{
supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition(0),
ImmutableMap.of(0, 0L),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task1")
);
supervisor.addTaskGroupToPendingCompletionTaskGroup(
supervisor.getTaskGroupIdForPartition(1),
ImmutableMap.of(0, 0L),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task2")
);
expect(taskClient.getMovingAveragesAsync("task1")).andReturn(Futures.immediateFuture(ImmutableMap.of(
"prop1",
"val1"
))).times(1);
expect(taskClient.getMovingAveragesAsync("task2")).andReturn(Futures.immediateFuture(ImmutableMap.of(
"prop2",
"val2"
))).times(1);
replayAll();
Map<String, Map<String, Object>> stats = supervisor.getStats();
verifyAll();
Assert.assertEquals(2, stats.size());
Assert.assertEquals(ImmutableSet.of("0", "1"), stats.keySet());
Assert.assertEquals(ImmutableMap.of("task1", ImmutableMap.of("prop1", "val1")), stats.get("0"));
Assert.assertEquals(ImmutableMap.of("task2", ImmutableMap.of("prop2", "val2")), stats.get("1"));
}
private void addSomeEvents(int numEventsPerPartition) throws Exception
{