Improve description field when emitting metric for broadcast failure (#14703)

Changes:
- Emit descriptions such as `Load queue is full`, `No disk space` etc. instead of `Unknown error`
- Rewrite `BroadcastDistributionRuleTest`
This commit is contained in:
Kashif Faraz 2023-08-01 10:13:55 +05:30 committed by GitHub
parent 5c96b60162
commit d04521d58f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 212 additions and 370 deletions

View File

@ -196,6 +196,11 @@ public class ServerHolder implements Comparable<ServerHolder>
return isDecommissioning;
}
public boolean isLoadQueueFull()
{
return totalAssignmentsInRun >= maxAssignmentsInRun;
}
public long getAvailableSize()
{
return getMaxSize() - getSizeUsed();

View File

@ -355,7 +355,7 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
}
/**
* Loads the broadcast segment if it is not loaded on the given server.
* Loads the broadcast segment if it is not already loaded on the given server.
* Returns true only if the segment was successfully queued for load on the server.
*/
private boolean loadBroadcastSegment(DataSegment segment, ServerHolder server)
@ -364,19 +364,21 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
return false;
} else if (server.isDroppingSegment(segment)) {
return server.cancelOperation(SegmentAction.DROP, segment);
} else if (server.canLoadSegment(segment)) {
return loadSegment(segment, server);
}
if (server.canLoadSegment(segment) && loadSegment(segment, server)) {
return true;
final String skipReason;
if (server.getAvailableSize() < segment.getSize()) {
skipReason = "Not enough disk space";
} else if (server.isLoadQueueFull()) {
skipReason = "Load queue is full";
} else {
log.makeAlert("Could not assign broadcast segment for datasource [%s]", segment.getDataSource())
.addData("segmentId", segment.getId())
.addData("segmentSize", segment.getSize())
.addData("hostName", server.getServer().getHost())
.addData("availableSize", server.getAvailableSize())
.emit();
return false;
skipReason = "Unknown error";
}
incrementSkipStat(Stats.Segments.ASSIGN_SKIPPED, skipReason, segment, server.getServer().getTier());
return false;
}
/**

View File

@ -195,5 +195,6 @@ public class ServerHolderTest
Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1)));
Assert.assertTrue(h1.hasSegmentLoaded(SEGMENTS.get(0).getId()));
Assert.assertFalse(h1.hasSegmentLoaded(SEGMENTS.get(1).getId()));
Assert.assertFalse(h1.isLoadQueueFull());
}
}

View File

@ -21,407 +21,203 @@ package org.apache.druid.server.coordinator.rules;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy;
import org.apache.druid.server.coordinator.loading.SegmentAction;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
public class BroadcastDistributionRuleTest
{
private DruidCluster druidCluster;
private ServerHolder holderOfSmallSegment;
private final List<ServerHolder> holdersOfLargeSegments = new ArrayList<>();
private final List<ServerHolder> holdersOfLargeSegments2 = new ArrayList<>();
private final List<DataSegment> largeSegments = new ArrayList<>();
private final List<DataSegment> largeSegments2 = new ArrayList<>();
private DataSegment smallSegment;
private DruidCluster secondCluster;
private ServerHolder activeServer;
private ServerHolder decommissioningServer1;
private ServerHolder decommissioningServer2;
private SegmentLoadQueueManager loadQueueManager;
private int serverId = 0;
private static final String DS_SMALL = "small_source";
private static final String DS_WIKI = "wiki";
private static final String TIER_1 = "tier1";
private static final String TIER_2 = "tier2";
private final DataSegment wikiSegment
= CreateDataSegments.ofDatasource(DS_WIKI).eachOfSizeInMb(100).get(0);
@Before
public void setUp()
{
loadQueueManager = new SegmentLoadQueueManager(null, null, null);
smallSegment = new DataSegment(
DS_SMALL,
Intervals.of("0/1000"),
DateTimes.nowUtc().toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
0
serverId = 0;
}
@Test
public void testSegmentIsBroadcastToAllTiers()
{
// 2 tiers with one server each
final ServerHolder serverT11 = create10gbHistorical(TIER_1);
final ServerHolder serverT21 = create10gbHistorical(TIER_2);
DruidCluster cluster = DruidCluster.builder().add(serverT11).add(serverT21).build();
DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster, wikiSegment);
ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule();
CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, params);
// Verify that segment is assigned to servers of all tiers
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_WIKI));
Assert.assertTrue(serverT11.isLoadingSegment(wikiSegment));
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_2, DS_WIKI));
Assert.assertTrue(serverT21.isLoadingSegment(wikiSegment));
}
@Test
public void testSegmentIsNotBroadcastToServerIfAlreadyLoaded()
{
// serverT11 is already serving the segment which is being broadcast
final ServerHolder serverT11 = create10gbHistorical(TIER_1, wikiSegment);
final ServerHolder serverT12 = create10gbHistorical(TIER_1);
DruidCluster cluster = DruidCluster.builder().add(serverT11).add(serverT12).build();
DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster, wikiSegment);
ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule();
CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, params);
// Verify that serverT11 is already serving and serverT12 is loading segment
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_WIKI));
Assert.assertFalse(serverT11.isLoadingSegment(wikiSegment));
Assert.assertTrue(serverT11.isServingSegment(wikiSegment));
Assert.assertTrue(serverT12.isLoadingSegment(wikiSegment));
}
@Test
public void testSegmentIsNotBroadcastToDecommissioningServer()
{
ServerHolder activeServer = create10gbHistorical(TIER_1);
ServerHolder decommissioningServer = createDecommissioningHistorical(TIER_1);
DruidCluster cluster = DruidCluster.builder()
.add(activeServer)
.add(decommissioningServer).build();
DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster, wikiSegment);
ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule();
CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, params);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_WIKI));
Assert.assertTrue(activeServer.isLoadingSegment(wikiSegment));
Assert.assertTrue(decommissioningServer.getLoadingSegments().isEmpty());
}
@Test
public void testBroadcastSegmentIsDroppedFromDecommissioningServer()
{
// Both active and decommissioning servers are already serving the segment
ServerHolder activeServer = create10gbHistorical(TIER_1, wikiSegment);
ServerHolder decommissioningServer = createDecommissioningHistorical(TIER_1, wikiSegment);
DruidCluster cluster = DruidCluster.builder()
.add(activeServer)
.add(decommissioningServer)
.build();
DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster, wikiSegment);
ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule();
CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, params);
// Verify that segment is dropped only from the decommissioning server
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, TIER_1, DS_WIKI));
Assert.assertTrue(activeServer.getPeon().getSegmentsToDrop().isEmpty());
Assert.assertTrue(decommissioningServer.getPeon().getSegmentsToDrop().contains(wikiSegment));
}
@Test
public void testSegmentIsBroadcastToAllServerTypes()
{
final ServerHolder broker = new ServerHolder(
create10gbServer(ServerType.BROKER, "broker_tier").toImmutableDruidServer(),
new TestLoadQueuePeon()
);
final ServerHolder indexer = new ServerHolder(
create10gbServer(ServerType.INDEXER_EXECUTOR, TIER_2).toImmutableDruidServer(),
new TestLoadQueuePeon()
);
final ServerHolder historical = create10gbHistorical(TIER_1);
for (int i = 0; i < 3; i++) {
largeSegments.add(
new DataSegment(
"large_source",
Intervals.of((i * 1000) + "/" + ((i + 1) * 1000)),
DateTimes.nowUtc().toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
100
)
);
}
DruidCluster cluster = DruidCluster.builder()
.add(broker).add(indexer).add(historical)
.build();
DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster, wikiSegment);
for (int i = 0; i < 2; i++) {
largeSegments2.add(
new DataSegment(
"large_source2",
Intervals.of((i * 1000) + "/" + ((i + 1) * 1000)),
DateTimes.nowUtc().toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
100
)
);
}
ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule();
final CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, params);
holderOfSmallSegment = new ServerHolder(
new DruidServer(
"serverHot2",
"hostHot2",
null,
1000,
ServerType.HISTORICAL,
TIER_1,
0
).addDataSegment(smallSegment)
.toImmutableDruidServer(),
// Verify that segment is assigned to historical, broker as well as indexer
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_WIKI));
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_2, DS_WIKI));
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, broker.getServer().getTier(), DS_WIKI));
Assert.assertTrue(historical.isLoadingSegment(wikiSegment));
Assert.assertTrue(indexer.isLoadingSegment(wikiSegment));
Assert.assertTrue(broker.isLoadingSegment(wikiSegment));
}
@Test
public void testReasonForBroadcastFailure()
{
final ServerHolder eligibleServer = create10gbHistorical(TIER_1);
final ServerHolder serverWithNoDiskSpace = new ServerHolder(
new DruidServer("server1", "server1", null, 0L, ServerType.HISTORICAL, TIER_1, 0)
.toImmutableDruidServer(),
new TestLoadQueuePeon()
);
holdersOfLargeSegments.add(
new ServerHolder(
new DruidServer(
"serverHot1",
"hostHot1",
null,
1000,
ServerType.HISTORICAL,
TIER_1,
0
).addDataSegment(largeSegments.get(0))
.toImmutableDruidServer(),
new TestLoadQueuePeon()
)
);
holdersOfLargeSegments.add(
new ServerHolder(
new DruidServer(
"serverNorm1",
"hostNorm1",
null,
1000,
ServerType.HISTORICAL,
TIER_2,
0
).addDataSegment(largeSegments.get(1))
.toImmutableDruidServer(),
new TestLoadQueuePeon()
)
);
holdersOfLargeSegments.add(
new ServerHolder(
new DruidServer(
"serverNorm2",
"hostNorm2",
null,
100,
ServerType.HISTORICAL,
TIER_2,
0
).addDataSegment(largeSegments.get(2))
.toImmutableDruidServer(),
new TestLoadQueuePeon()
)
// Create a server with full load queue
final int maxSegmentsInLoadQueue = 5;
final ServerHolder serverWithFullQueue = new ServerHolder(
create10gbServer(ServerType.HISTORICAL, TIER_1).toImmutableDruidServer(),
new TestLoadQueuePeon(), false, maxSegmentsInLoadQueue, 100
);
holdersOfLargeSegments2.add(
new ServerHolder(
new DruidServer(
"serverHot3",
"hostHot3",
null,
1000,
ServerType.HISTORICAL,
TIER_1,
0
).addDataSegment(largeSegments2.get(0))
.toImmutableDruidServer(),
new TestLoadQueuePeon()
)
);
holdersOfLargeSegments2.add(
new ServerHolder(
new DruidServer(
"serverNorm3",
"hostNorm3",
null,
100,
ServerType.HISTORICAL,
TIER_2,
0
).addDataSegment(largeSegments2.get(1))
.toImmutableDruidServer(),
new TestLoadQueuePeon()
)
);
List<DataSegment> segmentsInQueue
= CreateDataSegments.ofDatasource("koala")
.forIntervals(maxSegmentsInLoadQueue, Granularities.MONTH)
.withNumPartitions(1)
.eachOfSizeInMb(10);
segmentsInQueue.forEach(s -> serverWithFullQueue.startOperation(SegmentAction.LOAD, s));
Assert.assertTrue(serverWithFullQueue.isLoadQueueFull());
activeServer = new ServerHolder(
new DruidServer(
"active",
"host1",
null,
100,
ServerType.HISTORICAL,
TIER_1,
0
).addDataSegment(largeSegments.get(0))
.toImmutableDruidServer(),
new TestLoadQueuePeon()
);
DruidCluster cluster = DruidCluster.builder()
.add(eligibleServer)
.add(serverWithNoDiskSpace)
.add(serverWithFullQueue)
.build();
DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster, wikiSegment);
decommissioningServer1 = new ServerHolder(
new DruidServer(
"decommissioning1",
"host2",
null,
100,
ServerType.HISTORICAL,
TIER_1,
0
).addDataSegment(smallSegment)
.toImmutableDruidServer(),
new TestLoadQueuePeon(),
true
);
ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule();
final CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, params);
decommissioningServer2 = new ServerHolder(
new DruidServer(
"decommissioning2",
"host3",
null,
100,
ServerType.HISTORICAL,
TIER_1,
0
).addDataSegment(largeSegments.get(1))
.toImmutableDruidServer(),
new TestLoadQueuePeon(),
true
);
// Verify that the segment is broadcast only to the eligible server
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_WIKI));
RowKey metricKey = RowKey.with(Dimension.DATASOURCE, DS_WIKI)
.with(Dimension.TIER, TIER_1)
.and(Dimension.DESCRIPTION, "Not enough disk space");
Assert.assertEquals(1L, stats.get(Stats.Segments.ASSIGN_SKIPPED, metricKey));
druidCluster = DruidCluster
.builder()
.addTier(
TIER_1,
holdersOfLargeSegments.get(0),
holderOfSmallSegment,
holdersOfLargeSegments2.get(0)
)
.addTier(
TIER_2,
holdersOfLargeSegments.get(1),
holdersOfLargeSegments.get(2),
holdersOfLargeSegments2.get(1)
)
.build();
secondCluster = DruidCluster
.builder()
.addTier(
TIER_1,
activeServer,
decommissioningServer1,
decommissioningServer2
)
.build();
metricKey = RowKey.with(Dimension.DATASOURCE, DS_WIKI)
.with(Dimension.TIER, TIER_1)
.and(Dimension.DESCRIPTION, "Load queue is full");
Assert.assertEquals(1L, stats.get(Stats.Segments.ASSIGN_SKIPPED, metricKey));
}
@Test
public void testBroadcastToSingleDataSource()
{
final ForeverBroadcastDistributionRule rule =
new ForeverBroadcastDistributionRule();
CoordinatorRunStats stats = runRuleAndGetStats(
rule,
smallSegment,
makeCoordinartorRuntimeParams(
druidCluster,
smallSegment,
largeSegments.get(0),
largeSegments.get(1),
largeSegments.get(2),
largeSegments2.get(0),
largeSegments2.get(1)
)
);
Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_SMALL));
Assert.assertEquals(3L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_2, DS_SMALL));
Assert.assertTrue(
holdersOfLargeSegments.stream().allMatch(
holder -> holder.isLoadingSegment(smallSegment)
)
);
Assert.assertTrue(
holdersOfLargeSegments2.stream().allMatch(
holder -> holder.isLoadingSegment(smallSegment)
)
);
Assert.assertTrue(holderOfSmallSegment.isServingSegment(smallSegment));
}
private DruidCoordinatorRuntimeParams makeCoordinartorRuntimeParams(
DruidCluster druidCluster,
DataSegment... usedSegments
)
{
return DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc())
.withDruidCluster(druidCluster)
.withUsedSegmentsInTest(usedSegments)
.withBalancerStrategy(new RandomBalancerStrategy())
.withSegmentAssignerUsing(loadQueueManager)
.build();
}
/**
* Servers:
* name | segments
* -----------------+--------------
* active | large segment
* decommissioning1 | small segment
* decommissioning2 | large segment
* <p>
* After running the rule for the small segment:
* active | large & small segments
* decommissioning1 |
* decommissionint2 | large segment
*/
@Test
public void testBroadcastDecommissioning()
{
final ForeverBroadcastDistributionRule rule =
new ForeverBroadcastDistributionRule();
CoordinatorRunStats stats = runRuleAndGetStats(
rule,
smallSegment,
makeCoordinartorRuntimeParams(
secondCluster,
smallSegment,
largeSegments.get(0),
largeSegments.get(1)
)
);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_SMALL));
Assert.assertEquals(1, activeServer.getPeon().getSegmentsToLoad().size());
Assert.assertEquals(1, decommissioningServer1.getPeon().getSegmentsToDrop().size());
Assert.assertEquals(0, decommissioningServer2.getPeon().getSegmentsToLoad().size());
}
@Test
public void testBroadcastToMultipleDataSources()
{
final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule();
CoordinatorRunStats stats = runRuleAndGetStats(
rule,
smallSegment,
makeCoordinartorRuntimeParams(
druidCluster,
smallSegment,
largeSegments.get(0),
largeSegments.get(1),
largeSegments.get(2),
largeSegments2.get(0),
largeSegments2.get(1)
)
);
Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_SMALL));
Assert.assertEquals(3L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_2, DS_SMALL));
Assert.assertTrue(
holdersOfLargeSegments.stream().allMatch(
holder -> holder.isLoadingSegment(smallSegment)
)
);
Assert.assertTrue(
holdersOfLargeSegments2.stream().allMatch(
holder -> holder.isLoadingSegment(smallSegment)
)
);
Assert.assertFalse(holderOfSmallSegment.isLoadingSegment(smallSegment));
}
@Test
public void testBroadcastToAllServers()
{
final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule();
CoordinatorRunStats stats = runRuleAndGetStats(
rule,
smallSegment,
makeCoordinartorRuntimeParams(
druidCluster,
smallSegment,
largeSegments.get(0),
largeSegments.get(1),
largeSegments.get(2),
largeSegments2.get(0),
largeSegments2.get(1)
)
);
Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_SMALL));
Assert.assertEquals(3L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_2, DS_SMALL));
Assert.assertTrue(
druidCluster.getAllServers().stream().allMatch(
holder -> holder.isLoadingSegment(smallSegment) || holder.isServingSegment(smallSegment)
)
);
}
private CoordinatorRunStats runRuleAndGetStats(
private CoordinatorRunStats runRuleOnSegment(
Rule rule,
DataSegment segment,
DruidCoordinatorRuntimeParams params
@ -431,4 +227,42 @@ public class BroadcastDistributionRuleTest
rule.run(segment, segmentAssigner);
return segmentAssigner.getStats();
}
private DruidCoordinatorRuntimeParams makeParamsWithUsedSegments(
DruidCluster druidCluster,
DataSegment... usedSegments
)
{
return DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc())
.withDruidCluster(druidCluster)
.withUsedSegmentsInTest(usedSegments)
.withBalancerStrategy(new RandomBalancerStrategy())
.withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null, null))
.build();
}
private ServerHolder create10gbHistorical(String tier, DataSegment... segments)
{
DruidServer server = create10gbServer(ServerType.HISTORICAL, tier);
for (DataSegment segment : segments) {
server.addDataSegment(segment);
}
return new ServerHolder(server.toImmutableDruidServer(), new TestLoadQueuePeon());
}
private ServerHolder createDecommissioningHistorical(String tier, DataSegment... segments)
{
DruidServer server = create10gbServer(ServerType.HISTORICAL, tier);
for (DataSegment segment : segments) {
server.addDataSegment(segment);
}
return new ServerHolder(server.toImmutableDruidServer(), new TestLoadQueuePeon(), true);
}
private DruidServer create10gbServer(ServerType type, String tier)
{
final String name = "server_" + serverId++;
return new DruidServer(name, name, null, 10L << 30, type, tier, 0);
}
}