mirror of https://github.com/apache/druid.git
Add column replication_factor column to sys.segments table (#14403)
Description: Druid allows a configuration of load rules that may cause a used segment to not be loaded on any historical. This status is not tracked in the sys.segments table on the broker, which makes it difficult to determine if the unavailability of a segment is expected and if we should not wait for it to be loaded on a server after ingestion has finished. Changes: - Track replication factor in `SegmentReplicantLookup` during evaluation of load rules - Update API `/druid/coordinator/v1metadata/segments` to return replication factor - Add column `replication_factor` to the sys.segments virtual table and populate it in `MetadataSegmentView` - If this column is 0, the segment is not assigned to any historical and will not be loaded.
This commit is contained in:
parent
bd07c3dd43
commit
128133fadc
|
@ -173,11 +173,11 @@ Returns a list of all segments for one or more specific datasources enabled in t
|
||||||
|
|
||||||
`GET /druid/coordinator/v1/metadata/segments?includeOvershadowedStatus`
|
`GET /druid/coordinator/v1/metadata/segments?includeOvershadowedStatus`
|
||||||
|
|
||||||
Returns a list of all segments for each datasource with the full segment metadata and an extra field `overshadowed`.
|
Returns a list of all segments for each datasource with the full segment metadata and extra fields `overshadowed` and `replicationFactor`.
|
||||||
|
|
||||||
`GET /druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&datasources={dataSourceName1}&datasources={dataSourceName2}`
|
`GET /druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&datasources={dataSourceName1}&datasources={dataSourceName2}`
|
||||||
|
|
||||||
Returns a list of all segments for one or more specific datasources with the full segment metadata and an extra field `overshadowed`.
|
Returns a list of all segments for one or more specific datasources with the full segment metadata and extra fields `overshadowed` and `replicationFactor`.
|
||||||
|
|
||||||
`GET /druid/coordinator/v1/metadata/datasources`
|
`GET /druid/coordinator/v1/metadata/datasources`
|
||||||
|
|
||||||
|
|
|
@ -157,6 +157,7 @@ Segments table provides details on all Druid segments, whether they are publishe
|
||||||
|dimensions|VARCHAR|JSON-serialized form of the segment dimensions|
|
|dimensions|VARCHAR|JSON-serialized form of the segment dimensions|
|
||||||
|metrics|VARCHAR|JSON-serialized form of the segment metrics|
|
|metrics|VARCHAR|JSON-serialized form of the segment metrics|
|
||||||
|last_compaction_state|VARCHAR|JSON-serialized form of the compaction task's config (compaction task which created this segment). May be null if segment was not created by compaction task.|
|
|last_compaction_state|VARCHAR|JSON-serialized form of the compaction task's config (compaction task which created this segment). May be null if segment was not created by compaction task.|
|
||||||
|
|replication_factor|BIGINT|Total number of replicas of the segment that are required to be loaded across all historical tiers, based on the load rule that currently applies to this segment. If this value is 0, the segment is not assigned to any historical and will not be loaded. This value is -1 if load rules for the segment have not been evaluated yet.|
|
||||||
|
|
||||||
For example, to retrieve all currently active segments for datasource "wikipedia", use the query:
|
For example, to retrieve all currently active segments for datasource "wikipedia", use the query:
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
"shard_spec": "{\"type\":\"none\"}",
|
"shard_spec": "{\"type\":\"none\"}",
|
||||||
"dimensions": "[\"anonymous\",\"area_code\",\"city\",\"continent_code\",\"country_name\",\"dma_code\",\"geo\",\"language\",\"namespace\",\"network\",\"newpage\",\"page\",\"postal_code\",\"region_lookup\",\"robot\",\"unpatrolled\",\"user\"]",
|
"dimensions": "[\"anonymous\",\"area_code\",\"city\",\"continent_code\",\"country_name\",\"dma_code\",\"geo\",\"language\",\"namespace\",\"network\",\"newpage\",\"page\",\"postal_code\",\"region_lookup\",\"robot\",\"unpatrolled\",\"user\"]",
|
||||||
"metrics": "[\"added\",\"count\",\"deleted\",\"delta\",\"delta_hist\",\"unique_users\",\"variation\"]",
|
"metrics": "[\"added\",\"count\",\"deleted\",\"delta\",\"delta_hist\",\"unique_users\",\"variation\"]",
|
||||||
"last_compaction_state": null
|
"last_compaction_state": null,
|
||||||
|
"replication_factor": 2
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|
|
@ -23,39 +23,55 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.fasterxml.jackson.annotation.JsonUnwrapped;
|
import com.fasterxml.jackson.annotation.JsonUnwrapped;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* DataSegment object plus the overshadowed status for the segment. An immutable object.
|
* This class represents the current state of a segment in the cluster and encapsulates the following:
|
||||||
*
|
* <ul>
|
||||||
* SegmentWithOvershadowedStatus's {@link #compareTo} method considers only the {@link SegmentId}
|
* <li>the {@code DataSegment} object</li>
|
||||||
* of the DataSegment object.
|
* <li>overshadowed status of the segment</li>
|
||||||
|
* <li>replication factor of the segment</li>
|
||||||
|
* </ul>
|
||||||
|
* <br></br>
|
||||||
|
* Objects of this class are used to sync the state of segments from the Coordinator to different services, typically the Broker.
|
||||||
|
* The {@link #compareTo} method considers only the {@link SegmentId}.
|
||||||
*/
|
*/
|
||||||
public class SegmentWithOvershadowedStatus implements Comparable<SegmentWithOvershadowedStatus>
|
public class SegmentStatusInCluster implements Comparable<SegmentStatusInCluster>
|
||||||
{
|
{
|
||||||
private final boolean overshadowed;
|
private final boolean overshadowed;
|
||||||
|
/**
|
||||||
|
* The replication factor for the segment added across all tiers. This value is null if the load rules for
|
||||||
|
* the segment have not been evaluated yet.
|
||||||
|
*/
|
||||||
|
private final Integer replicationFactor;
|
||||||
/**
|
/**
|
||||||
* dataSegment is serialized "unwrapped", i.e. it's properties are included as properties of
|
* dataSegment is serialized "unwrapped", i.e. it's properties are included as properties of
|
||||||
* enclosing class. If in future, if {@code SegmentWithOvershadowedStatus} were to extend {@link DataSegment},
|
* enclosing class. If in the future, if {@code SegmentStatusInCluster} were to extend {@link DataSegment},
|
||||||
* there will be no change in the serialized format.
|
* there will be no change in the serialized format.
|
||||||
*/
|
*/
|
||||||
@JsonUnwrapped
|
@JsonUnwrapped
|
||||||
private final DataSegment dataSegment;
|
private final DataSegment dataSegment;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public SegmentWithOvershadowedStatus(
|
public SegmentStatusInCluster(
|
||||||
@JsonProperty("overshadowed") boolean overshadowed
|
@JsonProperty("overshadowed") boolean overshadowed,
|
||||||
|
@JsonProperty("replicationFactor") @Nullable Integer replicationFactor
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
// Jackson will overwrite dataSegment if needed (even though the field is final)
|
// Jackson will overwrite dataSegment if needed (even though the field is final)
|
||||||
this(null, overshadowed);
|
this(null, overshadowed, replicationFactor);
|
||||||
}
|
}
|
||||||
|
|
||||||
public SegmentWithOvershadowedStatus(
|
public SegmentStatusInCluster(
|
||||||
DataSegment dataSegment,
|
DataSegment dataSegment,
|
||||||
boolean overshadowed
|
boolean overshadowed,
|
||||||
|
Integer replicationFactor
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.dataSegment = dataSegment;
|
this.dataSegment = dataSegment;
|
||||||
this.overshadowed = overshadowed;
|
this.overshadowed = overshadowed;
|
||||||
|
this.replicationFactor = replicationFactor;
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@ -70,35 +86,36 @@ public class SegmentWithOvershadowedStatus implements Comparable<SegmentWithOver
|
||||||
return dataSegment;
|
return dataSegment;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
@JsonProperty
|
||||||
|
public Integer getReplicationFactor()
|
||||||
|
{
|
||||||
|
return replicationFactor;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o)
|
public boolean equals(Object o)
|
||||||
{
|
{
|
||||||
if (this == o) {
|
if (this == o) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if (!(o instanceof SegmentWithOvershadowedStatus)) {
|
if (o == null || getClass() != o.getClass()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
final SegmentWithOvershadowedStatus that = (SegmentWithOvershadowedStatus) o;
|
SegmentStatusInCluster that = (SegmentStatusInCluster) o;
|
||||||
if (!dataSegment.equals(that.dataSegment)) {
|
return overshadowed == that.overshadowed
|
||||||
return false;
|
&& Objects.equals(replicationFactor, that.replicationFactor)
|
||||||
}
|
&& Objects.equals(dataSegment, that.dataSegment);
|
||||||
if (overshadowed != (that.overshadowed)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode()
|
public int hashCode()
|
||||||
{
|
{
|
||||||
int result = dataSegment.hashCode();
|
return Objects.hash(overshadowed, replicationFactor, dataSegment);
|
||||||
result = 31 * result + Boolean.hashCode(overshadowed);
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int compareTo(SegmentWithOvershadowedStatus o)
|
public int compareTo(SegmentStatusInCluster o)
|
||||||
{
|
{
|
||||||
return dataSegment.getId().compareTo(o.dataSegment.getId());
|
return dataSegment.getId().compareTo(o.dataSegment.getId());
|
||||||
}
|
}
|
||||||
|
@ -106,8 +123,9 @@ public class SegmentWithOvershadowedStatus implements Comparable<SegmentWithOver
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
return "SegmentWithOvershadowedStatus{" +
|
return "SegmentStatusInCluster{" +
|
||||||
"overshadowed=" + overshadowed +
|
"overshadowed=" + overshadowed +
|
||||||
|
", replicationFactor=" + replicationFactor +
|
||||||
", dataSegment=" + dataSegment +
|
", dataSegment=" + dataSegment +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
|
@ -40,15 +40,17 @@ import javax.annotation.Nullable;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
public class SegmentWithOvershadowedStatusTest
|
public class SegmentStatusInClusterTest
|
||||||
{
|
{
|
||||||
private static final ObjectMapper MAPPER = createObjectMapper();
|
private static final ObjectMapper MAPPER = createObjectMapper();
|
||||||
private static final Interval INTERVAL = Intervals.of("2011-10-01/2011-10-02");
|
private static final Interval INTERVAL = Intervals.of("2011-10-01/2011-10-02");
|
||||||
private static final ImmutableMap<String, Object> LOAD_SPEC = ImmutableMap.of("something", "or_other");
|
private static final ImmutableMap<String, Object> LOAD_SPEC = ImmutableMap.of("something", "or_other");
|
||||||
private static final boolean OVERSHADOWED = true;
|
private static final boolean OVERSHADOWED = true;
|
||||||
|
private static final Integer REPLICATION_FACTOR = 2;
|
||||||
private static final int TEST_VERSION = 0x9;
|
private static final int TEST_VERSION = 0x9;
|
||||||
private static final SegmentWithOvershadowedStatus SEGMENT = createSegmentWithOvershadowedStatus();
|
private static final SegmentStatusInCluster SEGMENT = createSegmentForTest();
|
||||||
|
|
||||||
private static ObjectMapper createObjectMapper()
|
private static ObjectMapper createObjectMapper()
|
||||||
{
|
{
|
||||||
|
@ -59,7 +61,7 @@ public class SegmentWithOvershadowedStatusTest
|
||||||
return objectMapper;
|
return objectMapper;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static SegmentWithOvershadowedStatus createSegmentWithOvershadowedStatus()
|
private static SegmentStatusInCluster createSegmentForTest()
|
||||||
{
|
{
|
||||||
DataSegment dataSegment = new DataSegment(
|
DataSegment dataSegment = new DataSegment(
|
||||||
"something",
|
"something",
|
||||||
|
@ -74,7 +76,7 @@ public class SegmentWithOvershadowedStatusTest
|
||||||
1
|
1
|
||||||
);
|
);
|
||||||
|
|
||||||
return new SegmentWithOvershadowedStatus(dataSegment, OVERSHADOWED);
|
return new SegmentStatusInCluster(dataSegment, OVERSHADOWED, REPLICATION_FACTOR);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -85,7 +87,7 @@ public class SegmentWithOvershadowedStatusTest
|
||||||
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
|
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
|
||||||
);
|
);
|
||||||
|
|
||||||
Assert.assertEquals(11, objectMap.size());
|
Assert.assertEquals(12, objectMap.size());
|
||||||
Assert.assertEquals("something", objectMap.get("dataSource"));
|
Assert.assertEquals("something", objectMap.get("dataSource"));
|
||||||
Assert.assertEquals(INTERVAL.toString(), objectMap.get("interval"));
|
Assert.assertEquals(INTERVAL.toString(), objectMap.get("interval"));
|
||||||
Assert.assertEquals("1", objectMap.get("version"));
|
Assert.assertEquals("1", objectMap.get("version"));
|
||||||
|
@ -96,12 +98,13 @@ public class SegmentWithOvershadowedStatusTest
|
||||||
Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
|
Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
|
||||||
Assert.assertEquals(1, objectMap.get("size"));
|
Assert.assertEquals(1, objectMap.get("size"));
|
||||||
Assert.assertEquals(OVERSHADOWED, objectMap.get("overshadowed"));
|
Assert.assertEquals(OVERSHADOWED, objectMap.get("overshadowed"));
|
||||||
|
Assert.assertEquals(REPLICATION_FACTOR, objectMap.get("replicationFactor"));
|
||||||
|
|
||||||
final String json = MAPPER.writeValueAsString(SEGMENT);
|
final String json = MAPPER.writeValueAsString(SEGMENT);
|
||||||
|
|
||||||
final TestSegmentWithOvershadowedStatus deserializedSegment = MAPPER.readValue(
|
final TestSegment deserializedSegment = MAPPER.readValue(
|
||||||
json,
|
json,
|
||||||
TestSegmentWithOvershadowedStatus.class
|
TestSegment.class
|
||||||
);
|
);
|
||||||
|
|
||||||
DataSegment dataSegment = SEGMENT.getDataSegment();
|
DataSegment dataSegment = SEGMENT.getDataSegment();
|
||||||
|
@ -114,30 +117,33 @@ public class SegmentWithOvershadowedStatusTest
|
||||||
Assert.assertEquals(dataSegment.getShardSpec(), deserializedSegment.getShardSpec());
|
Assert.assertEquals(dataSegment.getShardSpec(), deserializedSegment.getShardSpec());
|
||||||
Assert.assertEquals(dataSegment.getSize(), deserializedSegment.getSize());
|
Assert.assertEquals(dataSegment.getSize(), deserializedSegment.getSize());
|
||||||
Assert.assertEquals(dataSegment.getId(), deserializedSegment.getId());
|
Assert.assertEquals(dataSegment.getId(), deserializedSegment.getId());
|
||||||
|
Assert.assertEquals(OVERSHADOWED, deserializedSegment.isOvershadowed());
|
||||||
|
Assert.assertEquals(REPLICATION_FACTOR, deserializedSegment.getReplicationFactor());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Previously, the implementation of SegmentWithOvershadowedStatus had @JsonCreator/@JsonProperty and @JsonUnwrapped
|
// Previously, the implementation of SegmentStatusInCluster had @JsonCreator/@JsonProperty and @JsonUnwrapped
|
||||||
// on the same field (dataSegment), which used to work in Jackson 2.6, but does not work with Jackson 2.9:
|
// on the same field (dataSegment), which used to work in Jackson 2.6, but does not work with Jackson 2.9:
|
||||||
// https://github.com/FasterXML/jackson-databind/issues/265#issuecomment-264344051
|
// https://github.com/FasterXML/jackson-databind/issues/265#issuecomment-264344051
|
||||||
@Test
|
@Test
|
||||||
public void testJsonCreatorAndJsonUnwrappedAnnotationsAreCompatible() throws Exception
|
public void testJsonCreatorAndJsonUnwrappedAnnotationsAreCompatible() throws Exception
|
||||||
{
|
{
|
||||||
String json = MAPPER.writeValueAsString(SEGMENT);
|
String json = MAPPER.writeValueAsString(SEGMENT);
|
||||||
SegmentWithOvershadowedStatus segment = MAPPER.readValue(json, SegmentWithOvershadowedStatus.class);
|
SegmentStatusInCluster segment = MAPPER.readValue(json, SegmentStatusInCluster.class);
|
||||||
Assert.assertEquals(SEGMENT, segment);
|
Assert.assertEquals(SEGMENT, segment);
|
||||||
Assert.assertEquals(json, MAPPER.writeValueAsString(segment));
|
Assert.assertEquals(json, MAPPER.writeValueAsString(segment));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Subclass of DataSegment with overshadowed status
|
* Flat subclass of DataSegment for testing
|
||||||
*/
|
*/
|
||||||
class TestSegmentWithOvershadowedStatus extends DataSegment
|
class TestSegment extends DataSegment
|
||||||
{
|
{
|
||||||
private final boolean overshadowed;
|
private final boolean overshadowed;
|
||||||
|
private final Integer replicationFactor;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public TestSegmentWithOvershadowedStatus(
|
public TestSegment(
|
||||||
@JsonProperty("dataSource") String dataSource,
|
@JsonProperty("dataSource") String dataSource,
|
||||||
@JsonProperty("interval") Interval interval,
|
@JsonProperty("interval") Interval interval,
|
||||||
@JsonProperty("version") String version,
|
@JsonProperty("version") String version,
|
||||||
|
@ -154,7 +160,8 @@ class TestSegmentWithOvershadowedStatus extends DataSegment
|
||||||
@JsonProperty("lasCompactionState") @Nullable CompactionState lastCompactionState,
|
@JsonProperty("lasCompactionState") @Nullable CompactionState lastCompactionState,
|
||||||
@JsonProperty("binaryVersion") Integer binaryVersion,
|
@JsonProperty("binaryVersion") Integer binaryVersion,
|
||||||
@JsonProperty("size") long size,
|
@JsonProperty("size") long size,
|
||||||
@JsonProperty("overshadowed") boolean overshadowed
|
@JsonProperty("overshadowed") boolean overshadowed,
|
||||||
|
@JsonProperty("replicationFactor") Integer replicationFactor
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(
|
super(
|
||||||
|
@ -170,6 +177,7 @@ class TestSegmentWithOvershadowedStatus extends DataSegment
|
||||||
size
|
size
|
||||||
);
|
);
|
||||||
this.overshadowed = overshadowed;
|
this.overshadowed = overshadowed;
|
||||||
|
this.replicationFactor = replicationFactor;
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@ -178,23 +186,31 @@ class TestSegmentWithOvershadowedStatus extends DataSegment
|
||||||
return overshadowed;
|
return overshadowed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public Integer getReplicationFactor()
|
||||||
|
{
|
||||||
|
return replicationFactor;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o)
|
public boolean equals(Object o)
|
||||||
{
|
{
|
||||||
if (this == o) {
|
if (this == o) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if (!(o instanceof TestSegmentWithOvershadowedStatus)) {
|
if (o == null || getClass() != o.getClass()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (!super.equals(o)) {
|
if (!super.equals(o)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
final TestSegmentWithOvershadowedStatus that = (TestSegmentWithOvershadowedStatus) o;
|
TestSegment that = (TestSegment) o;
|
||||||
if (overshadowed != (that.overshadowed)) {
|
return overshadowed == that.overshadowed && Objects.equals(replicationFactor, that.replicationFactor);
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
return Objects.hash(super.hashCode(), overshadowed, replicationFactor);
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -154,6 +154,13 @@ public class DruidCoordinator
|
||||||
|
|
||||||
private volatile boolean started = false;
|
private volatile boolean started = false;
|
||||||
private volatile SegmentReplicantLookup segmentReplicantLookup = null;
|
private volatile SegmentReplicantLookup segmentReplicantLookup = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Contains a map of segmentId to total replication factor across all tiers. This map is refreshed when load rules are
|
||||||
|
* evaluated. It is used by {@link DruidCoordinator} to supply this value to
|
||||||
|
* {@link org.apache.druid.server.http.MetadataResource}.
|
||||||
|
*/
|
||||||
|
private volatile Object2IntMap<SegmentId> segmentIdToReplicationFactor = null;
|
||||||
private volatile DruidCluster cluster = null;
|
private volatile DruidCluster cluster = null;
|
||||||
|
|
||||||
private int cachedBalancerThreadNumber;
|
private int cachedBalancerThreadNumber;
|
||||||
|
@ -817,6 +824,12 @@ public class DruidCoordinator
|
||||||
return ImmutableList.of(compactSegments);
|
return ImmutableList.of(compactSegments);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
public Integer getReplicationFactorForSegment(SegmentId segmentId)
|
||||||
|
{
|
||||||
|
return segmentIdToReplicationFactor == null ? null : segmentIdToReplicationFactor.getInt(segmentId);
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected class DutiesRunnable implements Runnable
|
protected class DutiesRunnable implements Runnable
|
||||||
{
|
{
|
||||||
|
@ -943,6 +956,13 @@ public class DruidCoordinator
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update the immutable replication factor map with latest values.
|
||||||
|
// This value is set here as it is recalculated during load rule evaluation.
|
||||||
|
if (params.getSegmentReplicantLookup() != null) {
|
||||||
|
segmentIdToReplicationFactor = params.getSegmentReplicantLookup().getSegmentIdToReplicationFactor();
|
||||||
|
}
|
||||||
|
|
||||||
// Emit the runtime of the full DutiesRunnable
|
// Emit the runtime of the full DutiesRunnable
|
||||||
params.getEmitter().emit(
|
params.getEmitter().emit(
|
||||||
new ServiceMetricEvent.Builder()
|
new ServiceMetricEvent.Builder()
|
||||||
|
|
|
@ -21,6 +21,8 @@ package org.apache.druid.server.coordinator;
|
||||||
|
|
||||||
import com.google.common.collect.HashBasedTable;
|
import com.google.common.collect.HashBasedTable;
|
||||||
import com.google.common.collect.Table;
|
import com.google.common.collect.Table;
|
||||||
|
import it.unimi.dsi.fastutil.objects.Object2IntMap;
|
||||||
|
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
|
||||||
import it.unimi.dsi.fastutil.objects.Object2LongMap;
|
import it.unimi.dsi.fastutil.objects.Object2LongMap;
|
||||||
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
|
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
|
||||||
import org.apache.druid.client.ImmutableDruidServer;
|
import org.apache.druid.client.ImmutableDruidServer;
|
||||||
|
@ -79,6 +81,7 @@ public class SegmentReplicantLookup
|
||||||
|
|
||||||
private final Table<SegmentId, String, Integer> segmentsInCluster;
|
private final Table<SegmentId, String, Integer> segmentsInCluster;
|
||||||
private final Table<SegmentId, String, Integer> loadingSegments;
|
private final Table<SegmentId, String, Integer> loadingSegments;
|
||||||
|
private final Map<SegmentId, Integer> segmentIdToReplicationFactor = new HashMap<>();
|
||||||
private final DruidCluster cluster;
|
private final DruidCluster cluster;
|
||||||
|
|
||||||
private SegmentReplicantLookup(
|
private SegmentReplicantLookup(
|
||||||
|
@ -114,6 +117,18 @@ public class SegmentReplicantLookup
|
||||||
return (retVal == null) ? 0 : retVal;
|
return (retVal == null) ? 0 : retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Refactor this setter, as this class is following a singleton pattern with only getters, and this breaks convention.
|
||||||
|
// This would be revamped in https://github.com/apache/druid/pull/13197
|
||||||
|
public void setReplicationFactor(SegmentId segmentId, Integer requiredReplicas)
|
||||||
|
{
|
||||||
|
segmentIdToReplicationFactor.put(segmentId, requiredReplicas);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Object2IntMap<SegmentId> getSegmentIdToReplicationFactor()
|
||||||
|
{
|
||||||
|
return new Object2IntOpenHashMap<>(segmentIdToReplicationFactor);
|
||||||
|
}
|
||||||
|
|
||||||
private int getLoadingReplicants(SegmentId segmentId, String tier)
|
private int getLoadingReplicants(SegmentId segmentId, String tier)
|
||||||
{
|
{
|
||||||
Integer retVal = loadingSegments.get(segmentId, tier);
|
Integer retVal = loadingSegments.get(segmentId, tier);
|
||||||
|
|
|
@ -77,6 +77,8 @@ public abstract class LoadRule implements Rule
|
||||||
targetReplicants.putAll(getTieredReplicants());
|
targetReplicants.putAll(getTieredReplicants());
|
||||||
currentReplicants.putAll(params.getSegmentReplicantLookup().getClusterTiers(segment.getId()));
|
currentReplicants.putAll(params.getSegmentReplicantLookup().getClusterTiers(segment.getId()));
|
||||||
|
|
||||||
|
params.getSegmentReplicantLookup().setReplicationFactor(segment.getId(), getReplicationFactor());
|
||||||
|
|
||||||
final CoordinatorStats stats = new CoordinatorStats();
|
final CoordinatorStats stats = new CoordinatorStats();
|
||||||
assign(params, segment, stats);
|
assign(params, segment, stats);
|
||||||
|
|
||||||
|
@ -93,6 +95,11 @@ public abstract class LoadRule implements Rule
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int getReplicationFactor()
|
||||||
|
{
|
||||||
|
return getTieredReplicants().values().stream().reduce(0, Integer::sum);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean canLoadSegments()
|
public boolean canLoadSegments()
|
||||||
{
|
{
|
||||||
|
|
|
@ -32,13 +32,14 @@ import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
|
||||||
import org.apache.druid.indexing.overlord.Segments;
|
import org.apache.druid.indexing.overlord.Segments;
|
||||||
import org.apache.druid.metadata.SegmentsMetadataManager;
|
import org.apache.druid.metadata.SegmentsMetadataManager;
|
||||||
import org.apache.druid.server.JettyUtils;
|
import org.apache.druid.server.JettyUtils;
|
||||||
|
import org.apache.druid.server.coordinator.DruidCoordinator;
|
||||||
import org.apache.druid.server.http.security.DatasourceResourceFilter;
|
import org.apache.druid.server.http.security.DatasourceResourceFilter;
|
||||||
import org.apache.druid.server.security.AuthorizationUtils;
|
import org.apache.druid.server.security.AuthorizationUtils;
|
||||||
import org.apache.druid.server.security.AuthorizerMapper;
|
import org.apache.druid.server.security.AuthorizerMapper;
|
||||||
import org.apache.druid.server.security.ResourceAction;
|
import org.apache.druid.server.security.ResourceAction;
|
||||||
import org.apache.druid.timeline.DataSegment;
|
import org.apache.druid.timeline.DataSegment;
|
||||||
import org.apache.druid.timeline.SegmentId;
|
import org.apache.druid.timeline.SegmentId;
|
||||||
import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
|
import org.apache.druid.timeline.SegmentStatusInCluster;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
@ -69,18 +70,21 @@ public class MetadataResource
|
||||||
private final SegmentsMetadataManager segmentsMetadataManager;
|
private final SegmentsMetadataManager segmentsMetadataManager;
|
||||||
private final IndexerMetadataStorageCoordinator metadataStorageCoordinator;
|
private final IndexerMetadataStorageCoordinator metadataStorageCoordinator;
|
||||||
private final AuthorizerMapper authorizerMapper;
|
private final AuthorizerMapper authorizerMapper;
|
||||||
|
private final DruidCoordinator coordinator;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public MetadataResource(
|
public MetadataResource(
|
||||||
SegmentsMetadataManager segmentsMetadataManager,
|
SegmentsMetadataManager segmentsMetadataManager,
|
||||||
IndexerMetadataStorageCoordinator metadataStorageCoordinator,
|
IndexerMetadataStorageCoordinator metadataStorageCoordinator,
|
||||||
AuthorizerMapper authorizerMapper,
|
AuthorizerMapper authorizerMapper,
|
||||||
|
DruidCoordinator coordinator,
|
||||||
@Json ObjectMapper jsonMapper
|
@Json ObjectMapper jsonMapper
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.segmentsMetadataManager = segmentsMetadataManager;
|
this.segmentsMetadataManager = segmentsMetadataManager;
|
||||||
this.metadataStorageCoordinator = metadataStorageCoordinator;
|
this.metadataStorageCoordinator = metadataStorageCoordinator;
|
||||||
this.authorizerMapper = authorizerMapper;
|
this.authorizerMapper = authorizerMapper;
|
||||||
|
this.coordinator = coordinator;
|
||||||
}
|
}
|
||||||
|
|
||||||
@GET
|
@GET
|
||||||
|
@ -140,7 +144,7 @@ public class MetadataResource
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
if (includeOvershadowedStatus != null) {
|
if (includeOvershadowedStatus != null) {
|
||||||
return getAllUsedSegmentsWithOvershadowedStatus(req, dataSources);
|
return getAllUsedSegmentsWithAdditionalDetails(req, dataSources);
|
||||||
}
|
}
|
||||||
|
|
||||||
Collection<ImmutableDruidDataSource> dataSourcesWithUsedSegments =
|
Collection<ImmutableDruidDataSource> dataSourcesWithUsedSegments =
|
||||||
|
@ -165,7 +169,7 @@ public class MetadataResource
|
||||||
return builder.entity(authorizedSegments).build();
|
return builder.entity(authorizedSegments).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Response getAllUsedSegmentsWithOvershadowedStatus(
|
private Response getAllUsedSegmentsWithAdditionalDetails(
|
||||||
HttpServletRequest req,
|
HttpServletRequest req,
|
||||||
@Nullable Set<String> dataSources
|
@Nullable Set<String> dataSources
|
||||||
)
|
)
|
||||||
|
@ -184,15 +188,30 @@ public class MetadataResource
|
||||||
.flatMap(t -> t.getSegments().stream());
|
.flatMap(t -> t.getSegments().stream());
|
||||||
final Set<DataSegment> overshadowedSegments = dataSourcesSnapshot.getOvershadowedSegments();
|
final Set<DataSegment> overshadowedSegments = dataSourcesSnapshot.getOvershadowedSegments();
|
||||||
|
|
||||||
final Stream<SegmentWithOvershadowedStatus> usedSegmentsWithOvershadowedStatus = usedSegments
|
final Stream<SegmentStatusInCluster> segmentStatus = usedSegments
|
||||||
.map(segment -> new SegmentWithOvershadowedStatus(segment, overshadowedSegments.contains(segment)));
|
.map(segment -> {
|
||||||
|
boolean isOvershadowed = overshadowedSegments.contains(segment);
|
||||||
|
Integer replicationFactor;
|
||||||
|
if (isOvershadowed) {
|
||||||
|
// If the segment is overshadowed, the replication factor won't be present in the coordinator, but we know
|
||||||
|
// that it should be 0 as we will unload it soon.
|
||||||
|
replicationFactor = 0;
|
||||||
|
} else {
|
||||||
|
replicationFactor = coordinator.getReplicationFactorForSegment(segment.getId());
|
||||||
|
}
|
||||||
|
return new SegmentStatusInCluster(
|
||||||
|
segment,
|
||||||
|
isOvershadowed,
|
||||||
|
replicationFactor
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
final Function<SegmentWithOvershadowedStatus, Iterable<ResourceAction>> raGenerator = segment -> Collections
|
final Function<SegmentStatusInCluster, Iterable<ResourceAction>> raGenerator = segment -> Collections
|
||||||
.singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource()));
|
.singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource()));
|
||||||
|
|
||||||
final Iterable<SegmentWithOvershadowedStatus> authorizedSegments = AuthorizationUtils.filterAuthorizedResources(
|
final Iterable<SegmentStatusInCluster> authorizedSegments = AuthorizationUtils.filterAuthorizedResources(
|
||||||
req,
|
req,
|
||||||
usedSegmentsWithOvershadowedStatus::iterator,
|
segmentStatus::iterator,
|
||||||
raGenerator,
|
raGenerator,
|
||||||
authorizerMapper
|
authorizerMapper
|
||||||
);
|
);
|
||||||
|
|
|
@ -0,0 +1,172 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.server.http;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import org.apache.druid.client.DataSourcesSnapshot;
|
||||||
|
import org.apache.druid.client.ImmutableDruidDataSource;
|
||||||
|
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
|
||||||
|
import org.apache.druid.java.util.common.Intervals;
|
||||||
|
import org.apache.druid.metadata.SegmentsMetadataManager;
|
||||||
|
import org.apache.druid.server.coordinator.DruidCoordinator;
|
||||||
|
import org.apache.druid.server.security.AuthConfig;
|
||||||
|
import org.apache.druid.server.security.AuthTestUtils;
|
||||||
|
import org.apache.druid.server.security.AuthenticationResult;
|
||||||
|
import org.apache.druid.timeline.DataSegment;
|
||||||
|
import org.apache.druid.timeline.SegmentStatusInCluster;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.ws.rs.core.Response;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
public class MetadataResourceTest
|
||||||
|
{
|
||||||
|
private static final String DATASOURCE1 = "datasource1";
|
||||||
|
private static final String DATASOURCE2 = "datasource2";
|
||||||
|
|
||||||
|
private MetadataResource metadataResource;
|
||||||
|
|
||||||
|
private SegmentsMetadataManager segmentsMetadataManager;
|
||||||
|
private DruidCoordinator coordinator;
|
||||||
|
private HttpServletRequest request;
|
||||||
|
|
||||||
|
private final DataSegment dataSegment1 = new DataSegment(
|
||||||
|
DATASOURCE1,
|
||||||
|
Intervals.of("2010-01-01/P1D"),
|
||||||
|
"v0",
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
0x9,
|
||||||
|
10
|
||||||
|
);
|
||||||
|
|
||||||
|
private final DataSegment dataSegment2 = new DataSegment(
|
||||||
|
DATASOURCE1,
|
||||||
|
Intervals.of("2010-01-22/P1D"),
|
||||||
|
"v0",
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
0x9,
|
||||||
|
20
|
||||||
|
);
|
||||||
|
|
||||||
|
private final DataSegment dataSegment3 = new DataSegment(
|
||||||
|
DATASOURCE2,
|
||||||
|
Intervals.of("2010-01-01/P1M"),
|
||||||
|
"v0",
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
0x9,
|
||||||
|
30
|
||||||
|
);
|
||||||
|
|
||||||
|
private final DataSegment dataSegment4 = new DataSegment(
|
||||||
|
DATASOURCE2,
|
||||||
|
Intervals.of("2010-01-02/P1D"),
|
||||||
|
"v0",
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
0x9,
|
||||||
|
35
|
||||||
|
);
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp()
|
||||||
|
{
|
||||||
|
request = mock(HttpServletRequest.class);
|
||||||
|
doReturn(mock(AuthenticationResult.class)).when(request).getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT);
|
||||||
|
|
||||||
|
segmentsMetadataManager = mock(SegmentsMetadataManager.class);
|
||||||
|
ImmutableDruidDataSource druidDataSource1 = new ImmutableDruidDataSource(
|
||||||
|
DATASOURCE1,
|
||||||
|
ImmutableMap.of(),
|
||||||
|
ImmutableList.of(
|
||||||
|
dataSegment1,
|
||||||
|
dataSegment2
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
ImmutableDruidDataSource druidDataSource2 = new ImmutableDruidDataSource(
|
||||||
|
DATASOURCE1,
|
||||||
|
ImmutableMap.of(),
|
||||||
|
ImmutableList.of(
|
||||||
|
dataSegment3,
|
||||||
|
dataSegment4
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
DataSourcesSnapshot dataSourcesSnapshot = mock(DataSourcesSnapshot.class);
|
||||||
|
doReturn(dataSourcesSnapshot).when(segmentsMetadataManager).getSnapshotOfDataSourcesWithAllUsedSegments();
|
||||||
|
doReturn(ImmutableList.of(druidDataSource1, druidDataSource2)).when(dataSourcesSnapshot).getDataSourcesWithAllUsedSegments();
|
||||||
|
|
||||||
|
coordinator = mock(DruidCoordinator.class);
|
||||||
|
doReturn(2).when(coordinator).getReplicationFactorForSegment(dataSegment1.getId());
|
||||||
|
doReturn(null).when(coordinator).getReplicationFactorForSegment(dataSegment2.getId());
|
||||||
|
doReturn(1).when(coordinator).getReplicationFactorForSegment(dataSegment3.getId());
|
||||||
|
doReturn(1).when(coordinator).getReplicationFactorForSegment(dataSegment4.getId());
|
||||||
|
doReturn(ImmutableSet.of(dataSegment4)).when(dataSourcesSnapshot).getOvershadowedSegments();
|
||||||
|
|
||||||
|
metadataResource = new MetadataResource(segmentsMetadataManager, mock(IndexerMetadataStorageCoordinator.class), AuthTestUtils.TEST_AUTHORIZER_MAPPER, coordinator, new ObjectMapper());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetAllSegmentsWithOvershadowedStatus()
|
||||||
|
{
|
||||||
|
Response response = metadataResource.getAllUsedSegments(
|
||||||
|
request,
|
||||||
|
null,
|
||||||
|
"includeOvershadowedStatus"
|
||||||
|
);
|
||||||
|
|
||||||
|
List<SegmentStatusInCluster> resultList = materializeResponse(response);
|
||||||
|
Assert.assertEquals(resultList.size(), 4);
|
||||||
|
Assert.assertEquals(new SegmentStatusInCluster(dataSegment1, false, 2), resultList.get(0));
|
||||||
|
Assert.assertEquals(new SegmentStatusInCluster(dataSegment2, false, null), resultList.get(1));
|
||||||
|
Assert.assertEquals(new SegmentStatusInCluster(dataSegment3, false, 1), resultList.get(2));
|
||||||
|
// Replication factor should be 0 as the segment is overshadowed
|
||||||
|
Assert.assertEquals(new SegmentStatusInCluster(dataSegment4, true, 0), resultList.get(3));
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<SegmentStatusInCluster> materializeResponse(Response response)
|
||||||
|
{
|
||||||
|
Iterable<SegmentStatusInCluster> resultIterator = (Iterable<SegmentStatusInCluster>) response.getEntity();
|
||||||
|
List<SegmentStatusInCluster> segmentStatusInClusters = new ArrayList<>();
|
||||||
|
resultIterator.forEach(segmentStatusInClusters::add);
|
||||||
|
return segmentStatusInClusters;
|
||||||
|
}
|
||||||
|
}
|
|
@ -22,6 +22,8 @@ package org.apache.druid.sql.calcite.schema;
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.cache.Cache;
|
||||||
|
import com.google.common.cache.CacheBuilder;
|
||||||
import com.google.common.collect.ImmutableSortedSet;
|
import com.google.common.collect.ImmutableSortedSet;
|
||||||
import com.google.common.util.concurrent.Uninterruptibles;
|
import com.google.common.util.concurrent.Uninterruptibles;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
@ -40,7 +42,8 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||||
import org.apache.druid.metadata.SegmentsMetadataManager;
|
import org.apache.druid.metadata.SegmentsMetadataManager;
|
||||||
import org.apache.druid.sql.calcite.planner.SegmentMetadataCacheConfig;
|
import org.apache.druid.sql.calcite.planner.SegmentMetadataCacheConfig;
|
||||||
import org.apache.druid.timeline.DataSegment;
|
import org.apache.druid.timeline.DataSegment;
|
||||||
import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
|
import org.apache.druid.timeline.SegmentId;
|
||||||
|
import org.apache.druid.timeline.SegmentStatusInCluster;
|
||||||
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
||||||
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -76,7 +79,12 @@ public class MetadataSegmentView
|
||||||
* from other threads.
|
* from other threads.
|
||||||
*/
|
*/
|
||||||
@MonotonicNonNull
|
@MonotonicNonNull
|
||||||
private volatile ImmutableSortedSet<SegmentWithOvershadowedStatus> publishedSegments = null;
|
private volatile ImmutableSortedSet<SegmentStatusInCluster> publishedSegments = null;
|
||||||
|
/**
|
||||||
|
* Caches the replication factor for segment IDs. In case of coordinator restarts or leadership re-elections, the coordinator API returns `null` replication factor until load rules are evaluated.
|
||||||
|
* The cache can be used during these periods to continue serving the previously fetched values.
|
||||||
|
*/
|
||||||
|
private final Cache<SegmentId, Integer> segmentIdToReplicationFactor;
|
||||||
private final ScheduledExecutorService scheduledExec;
|
private final ScheduledExecutorService scheduledExec;
|
||||||
private final long pollPeriodInMS;
|
private final long pollPeriodInMS;
|
||||||
private final LifecycleLock lifecycleLock = new LifecycleLock();
|
private final LifecycleLock lifecycleLock = new LifecycleLock();
|
||||||
|
@ -97,6 +105,9 @@ public class MetadataSegmentView
|
||||||
this.isCacheEnabled = config.isMetadataSegmentCacheEnable();
|
this.isCacheEnabled = config.isMetadataSegmentCacheEnable();
|
||||||
this.pollPeriodInMS = config.getMetadataSegmentPollPeriod();
|
this.pollPeriodInMS = config.getMetadataSegmentPollPeriod();
|
||||||
this.scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
|
this.scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
|
||||||
|
this.segmentIdToReplicationFactor = CacheBuilder.newBuilder()
|
||||||
|
.expireAfterAccess(10, TimeUnit.MINUTES)
|
||||||
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@LifecycleStart
|
@LifecycleStart
|
||||||
|
@ -133,27 +144,34 @@ public class MetadataSegmentView
|
||||||
private void poll()
|
private void poll()
|
||||||
{
|
{
|
||||||
log.info("polling published segments from coordinator");
|
log.info("polling published segments from coordinator");
|
||||||
final JsonParserIterator<SegmentWithOvershadowedStatus> metadataSegments = getMetadataSegments(
|
final JsonParserIterator<SegmentStatusInCluster> metadataSegments = getMetadataSegments(
|
||||||
coordinatorDruidLeaderClient,
|
coordinatorDruidLeaderClient,
|
||||||
jsonMapper,
|
jsonMapper,
|
||||||
segmentWatcherConfig.getWatchedDataSources()
|
segmentWatcherConfig.getWatchedDataSources()
|
||||||
);
|
);
|
||||||
|
|
||||||
final ImmutableSortedSet.Builder<SegmentWithOvershadowedStatus> builder = ImmutableSortedSet.naturalOrder();
|
final ImmutableSortedSet.Builder<SegmentStatusInCluster> builder = ImmutableSortedSet.naturalOrder();
|
||||||
while (metadataSegments.hasNext()) {
|
while (metadataSegments.hasNext()) {
|
||||||
final SegmentWithOvershadowedStatus segment = metadataSegments.next();
|
final SegmentStatusInCluster segment = metadataSegments.next();
|
||||||
final DataSegment interned = DataSegmentInterner.intern(segment.getDataSegment());
|
final DataSegment interned = DataSegmentInterner.intern(segment.getDataSegment());
|
||||||
final SegmentWithOvershadowedStatus segmentWithOvershadowedStatus = new SegmentWithOvershadowedStatus(
|
Integer replicationFactor = segment.getReplicationFactor();
|
||||||
|
if (replicationFactor == null) {
|
||||||
|
replicationFactor = segmentIdToReplicationFactor.getIfPresent(segment.getDataSegment().getId());
|
||||||
|
} else {
|
||||||
|
segmentIdToReplicationFactor.put(segment.getDataSegment().getId(), segment.getReplicationFactor());
|
||||||
|
}
|
||||||
|
final SegmentStatusInCluster segmentStatusInCluster = new SegmentStatusInCluster(
|
||||||
interned,
|
interned,
|
||||||
segment.isOvershadowed()
|
segment.isOvershadowed(),
|
||||||
|
replicationFactor
|
||||||
);
|
);
|
||||||
builder.add(segmentWithOvershadowedStatus);
|
builder.add(segmentStatusInCluster);
|
||||||
}
|
}
|
||||||
publishedSegments = builder.build();
|
publishedSegments = builder.build();
|
||||||
cachePopulated.countDown();
|
cachePopulated.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
Iterator<SegmentWithOvershadowedStatus> getPublishedSegments()
|
Iterator<SegmentStatusInCluster> getPublishedSegments()
|
||||||
{
|
{
|
||||||
if (isCacheEnabled) {
|
if (isCacheEnabled) {
|
||||||
Uninterruptibles.awaitUninterruptibly(cachePopulated);
|
Uninterruptibles.awaitUninterruptibly(cachePopulated);
|
||||||
|
@ -168,7 +186,7 @@ public class MetadataSegmentView
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note that coordinator must be up to get segments
|
// Note that coordinator must be up to get segments
|
||||||
private JsonParserIterator<SegmentWithOvershadowedStatus> getMetadataSegments(
|
private JsonParserIterator<SegmentStatusInCluster> getMetadataSegments(
|
||||||
DruidLeaderClient coordinatorClient,
|
DruidLeaderClient coordinatorClient,
|
||||||
ObjectMapper jsonMapper,
|
ObjectMapper jsonMapper,
|
||||||
Set<String> watchedDataSources
|
Set<String> watchedDataSources
|
||||||
|
@ -188,7 +206,7 @@ public class MetadataSegmentView
|
||||||
|
|
||||||
return SystemSchema.getThingsFromLeaderNode(
|
return SystemSchema.getThingsFromLeaderNode(
|
||||||
query,
|
query,
|
||||||
new TypeReference<SegmentWithOvershadowedStatus>()
|
new TypeReference<SegmentStatusInCluster>()
|
||||||
{
|
{
|
||||||
},
|
},
|
||||||
coordinatorClient,
|
coordinatorClient,
|
||||||
|
|
|
@ -81,7 +81,7 @@ import org.apache.druid.sql.calcite.planner.PlannerContext;
|
||||||
import org.apache.druid.sql.calcite.table.RowSignatures;
|
import org.apache.druid.sql.calcite.table.RowSignatures;
|
||||||
import org.apache.druid.timeline.DataSegment;
|
import org.apache.druid.timeline.DataSegment;
|
||||||
import org.apache.druid.timeline.SegmentId;
|
import org.apache.druid.timeline.SegmentId;
|
||||||
import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
|
import org.apache.druid.timeline.SegmentStatusInCluster;
|
||||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
@ -106,8 +106,8 @@ public class SystemSchema extends AbstractSchema
|
||||||
private static final String TASKS_TABLE = "tasks";
|
private static final String TASKS_TABLE = "tasks";
|
||||||
private static final String SUPERVISOR_TABLE = "supervisors";
|
private static final String SUPERVISOR_TABLE = "supervisors";
|
||||||
|
|
||||||
private static final Function<SegmentWithOvershadowedStatus, Iterable<ResourceAction>>
|
private static final Function<SegmentStatusInCluster, Iterable<ResourceAction>>
|
||||||
SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR = segment ->
|
SEGMENT_STATUS_IN_CLUSTER_RA_GENERATOR = segment ->
|
||||||
Collections.singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(
|
Collections.singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(
|
||||||
segment.getDataSegment().getDataSource())
|
segment.getDataSegment().getDataSource())
|
||||||
);
|
);
|
||||||
|
@ -117,6 +117,8 @@ public class SystemSchema extends AbstractSchema
|
||||||
segment.getDataSource())
|
segment.getDataSource())
|
||||||
);
|
);
|
||||||
|
|
||||||
|
private static final long REPLICATION_FACTOR_UNKNOWN = -1L;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Booleans constants represented as long type,
|
* Booleans constants represented as long type,
|
||||||
* where 1 = true and 0 = false to make it easy to count number of segments
|
* where 1 = true and 0 = false to make it easy to count number of segments
|
||||||
|
@ -150,6 +152,7 @@ public class SystemSchema extends AbstractSchema
|
||||||
.add("dimensions", ColumnType.STRING)
|
.add("dimensions", ColumnType.STRING)
|
||||||
.add("metrics", ColumnType.STRING)
|
.add("metrics", ColumnType.STRING)
|
||||||
.add("last_compaction_state", ColumnType.STRING)
|
.add("last_compaction_state", ColumnType.STRING)
|
||||||
|
.add("replication_factor", ColumnType.LONG)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
static final RowSignature SERVERS_SIGNATURE = RowSignature
|
static final RowSignature SERVERS_SIGNATURE = RowSignature
|
||||||
|
@ -288,7 +291,7 @@ public class SystemSchema extends AbstractSchema
|
||||||
|
|
||||||
// Get published segments from metadata segment cache (if enabled in SQL planner config), else directly from
|
// Get published segments from metadata segment cache (if enabled in SQL planner config), else directly from
|
||||||
// Coordinator.
|
// Coordinator.
|
||||||
final Iterator<SegmentWithOvershadowedStatus> metadataStoreSegments = metadataView.getPublishedSegments();
|
final Iterator<SegmentStatusInCluster> metadataStoreSegments = metadataView.getPublishedSegments();
|
||||||
|
|
||||||
final Set<SegmentId> segmentsAlreadySeen = Sets.newHashSetWithExpectedSize(druidSchema.cache().getTotalSegments());
|
final Set<SegmentId> segmentsAlreadySeen = Sets.newHashSetWithExpectedSize(druidSchema.cache().getTotalSegments());
|
||||||
|
|
||||||
|
@ -326,7 +329,10 @@ public class SystemSchema extends AbstractSchema
|
||||||
segment.getShardSpec() == null ? null : jsonMapper.writeValueAsString(segment.getShardSpec()),
|
segment.getShardSpec() == null ? null : jsonMapper.writeValueAsString(segment.getShardSpec()),
|
||||||
segment.getDimensions() == null ? null : jsonMapper.writeValueAsString(segment.getDimensions()),
|
segment.getDimensions() == null ? null : jsonMapper.writeValueAsString(segment.getDimensions()),
|
||||||
segment.getMetrics() == null ? null : jsonMapper.writeValueAsString(segment.getMetrics()),
|
segment.getMetrics() == null ? null : jsonMapper.writeValueAsString(segment.getMetrics()),
|
||||||
segment.getLastCompactionState() == null ? null : jsonMapper.writeValueAsString(segment.getLastCompactionState())
|
segment.getLastCompactionState() == null ? null : jsonMapper.writeValueAsString(segment.getLastCompactionState()),
|
||||||
|
// If the value is null, the load rules might have not evaluated yet, and we don't know the replication factor.
|
||||||
|
// This should be automatically updated in the next refesh with Coordinator.
|
||||||
|
val.getReplicationFactor() == null ? REPLICATION_FACTOR_UNKNOWN : (long) val.getReplicationFactor()
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
catch (JsonProcessingException e) {
|
catch (JsonProcessingException e) {
|
||||||
|
@ -368,7 +374,8 @@ public class SystemSchema extends AbstractSchema
|
||||||
val.getValue().getSegment().getShardSpec() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getShardSpec()),
|
val.getValue().getSegment().getShardSpec() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getShardSpec()),
|
||||||
val.getValue().getSegment().getDimensions() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getDimensions()),
|
val.getValue().getSegment().getDimensions() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getDimensions()),
|
||||||
val.getValue().getSegment().getMetrics() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getMetrics()),
|
val.getValue().getSegment().getMetrics() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getMetrics()),
|
||||||
null // unpublished segments from realtime tasks will not be compacted yet
|
null, // unpublished segments from realtime tasks will not be compacted yet
|
||||||
|
REPLICATION_FACTOR_UNKNOWN // If the segment is unpublished, we won't have this information yet.
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
catch (JsonProcessingException e) {
|
catch (JsonProcessingException e) {
|
||||||
|
@ -384,8 +391,8 @@ public class SystemSchema extends AbstractSchema
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Iterator<SegmentWithOvershadowedStatus> getAuthorizedPublishedSegments(
|
private Iterator<SegmentStatusInCluster> getAuthorizedPublishedSegments(
|
||||||
Iterator<SegmentWithOvershadowedStatus> it,
|
Iterator<SegmentStatusInCluster> it,
|
||||||
DataContext root
|
DataContext root
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
@ -394,11 +401,11 @@ public class SystemSchema extends AbstractSchema
|
||||||
"authenticationResult in dataContext"
|
"authenticationResult in dataContext"
|
||||||
);
|
);
|
||||||
|
|
||||||
final Iterable<SegmentWithOvershadowedStatus> authorizedSegments = AuthorizationUtils
|
final Iterable<SegmentStatusInCluster> authorizedSegments = AuthorizationUtils
|
||||||
.filterAuthorizedResources(
|
.filterAuthorizedResources(
|
||||||
authenticationResult,
|
authenticationResult,
|
||||||
() -> it,
|
() -> it,
|
||||||
SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR,
|
SEGMENT_STATUS_IN_CLUSTER_RA_GENERATOR,
|
||||||
authorizerMapper
|
authorizerMapper
|
||||||
);
|
);
|
||||||
return authorizedSegments.iterator();
|
return authorizedSegments.iterator();
|
||||||
|
|
|
@ -99,7 +99,7 @@ import org.apache.druid.sql.calcite.util.TestServerInventoryView;
|
||||||
import org.apache.druid.timeline.CompactionState;
|
import org.apache.druid.timeline.CompactionState;
|
||||||
import org.apache.druid.timeline.DataSegment;
|
import org.apache.druid.timeline.DataSegment;
|
||||||
import org.apache.druid.timeline.SegmentId;
|
import org.apache.druid.timeline.SegmentId;
|
||||||
import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
|
import org.apache.druid.timeline.SegmentStatusInCluster;
|
||||||
import org.apache.druid.timeline.partition.NumberedShardSpec;
|
import org.apache.druid.timeline.partition.NumberedShardSpec;
|
||||||
import org.easymock.EasyMock;
|
import org.easymock.EasyMock;
|
||||||
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
|
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
|
||||||
|
@ -542,7 +542,7 @@ public class SystemSchemaTest extends CalciteTestBase
|
||||||
final RelDataType rowType = segmentsTable.getRowType(new JavaTypeFactoryImpl());
|
final RelDataType rowType = segmentsTable.getRowType(new JavaTypeFactoryImpl());
|
||||||
final List<RelDataTypeField> fields = rowType.getFieldList();
|
final List<RelDataTypeField> fields = rowType.getFieldList();
|
||||||
|
|
||||||
Assert.assertEquals(18, fields.size());
|
Assert.assertEquals(19, fields.size());
|
||||||
|
|
||||||
final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable) schema.getTableMap().get("tasks");
|
final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable) schema.getTableMap().get("tasks");
|
||||||
final RelDataType sysRowType = tasksTable.getRowType(new JavaTypeFactoryImpl());
|
final RelDataType sysRowType = tasksTable.getRowType(new JavaTypeFactoryImpl());
|
||||||
|
@ -564,12 +564,12 @@ public class SystemSchemaTest extends CalciteTestBase
|
||||||
public void testSegmentsTable() throws Exception
|
public void testSegmentsTable() throws Exception
|
||||||
{
|
{
|
||||||
final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, metadataView, new ObjectMapper(), authMapper);
|
final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, metadataView, new ObjectMapper(), authMapper);
|
||||||
final Set<SegmentWithOvershadowedStatus> publishedSegments = new HashSet<>(Arrays.asList(
|
final Set<SegmentStatusInCluster> publishedSegments = new HashSet<>(Arrays.asList(
|
||||||
new SegmentWithOvershadowedStatus(publishedCompactedSegment1, true),
|
new SegmentStatusInCluster(publishedCompactedSegment1, true, 2),
|
||||||
new SegmentWithOvershadowedStatus(publishedCompactedSegment2, false),
|
new SegmentStatusInCluster(publishedCompactedSegment2, false, 0),
|
||||||
new SegmentWithOvershadowedStatus(publishedUncompactedSegment3, false),
|
new SegmentStatusInCluster(publishedUncompactedSegment3, false, 2),
|
||||||
new SegmentWithOvershadowedStatus(segment1, true),
|
new SegmentStatusInCluster(segment1, true, 2),
|
||||||
new SegmentWithOvershadowedStatus(segment2, false)
|
new SegmentStatusInCluster(segment2, false, 0)
|
||||||
));
|
));
|
||||||
|
|
||||||
EasyMock.expect(metadataView.getPublishedSegments()).andReturn(publishedSegments.iterator()).once();
|
EasyMock.expect(metadataView.getPublishedSegments()).andReturn(publishedSegments.iterator()).once();
|
||||||
|
@ -598,7 +598,8 @@ public class SystemSchemaTest extends CalciteTestBase
|
||||||
1L, //is_available
|
1L, //is_available
|
||||||
0L, //is_realtime
|
0L, //is_realtime
|
||||||
1L, //is_overshadowed
|
1L, //is_overshadowed
|
||||||
null //is_compacted
|
null, //is_compacted
|
||||||
|
2L // replication_factor
|
||||||
);
|
);
|
||||||
|
|
||||||
verifyRow(
|
verifyRow(
|
||||||
|
@ -612,7 +613,8 @@ public class SystemSchemaTest extends CalciteTestBase
|
||||||
1L, //is_available
|
1L, //is_available
|
||||||
0L, //is_realtime
|
0L, //is_realtime
|
||||||
0L, //is_overshadowed,
|
0L, //is_overshadowed,
|
||||||
null //is_compacted
|
null, //is_compacted
|
||||||
|
0L // replication_factor
|
||||||
);
|
);
|
||||||
|
|
||||||
//segment test3 is unpublished and has a NumberedShardSpec with partitionNum = 2
|
//segment test3 is unpublished and has a NumberedShardSpec with partitionNum = 2
|
||||||
|
@ -627,7 +629,8 @@ public class SystemSchemaTest extends CalciteTestBase
|
||||||
1L, //is_available
|
1L, //is_available
|
||||||
0L, //is_realtime
|
0L, //is_realtime
|
||||||
0L, //is_overshadowed
|
0L, //is_overshadowed
|
||||||
null //is_compacted
|
null, //is_compacted
|
||||||
|
-1L // replication_factor
|
||||||
);
|
);
|
||||||
|
|
||||||
verifyRow(
|
verifyRow(
|
||||||
|
@ -641,7 +644,8 @@ public class SystemSchemaTest extends CalciteTestBase
|
||||||
1L, //is_available
|
1L, //is_available
|
||||||
1L, //is_realtime
|
1L, //is_realtime
|
||||||
0L, //is_overshadowed
|
0L, //is_overshadowed
|
||||||
null //is_compacted
|
null, //is_compacted
|
||||||
|
-1L // replication_factor
|
||||||
);
|
);
|
||||||
|
|
||||||
verifyRow(
|
verifyRow(
|
||||||
|
@ -655,7 +659,8 @@ public class SystemSchemaTest extends CalciteTestBase
|
||||||
1L, //is_available
|
1L, //is_available
|
||||||
1L, //is_realtime
|
1L, //is_realtime
|
||||||
0L, //is_overshadowed
|
0L, //is_overshadowed
|
||||||
null //is_compacted
|
null, //is_compacted
|
||||||
|
-1L // replication_factor
|
||||||
);
|
);
|
||||||
|
|
||||||
// wikipedia segments are published and unavailable, num_replicas is 0
|
// wikipedia segments are published and unavailable, num_replicas is 0
|
||||||
|
@ -671,7 +676,8 @@ public class SystemSchemaTest extends CalciteTestBase
|
||||||
0L, //is_available
|
0L, //is_available
|
||||||
0L, //is_realtime
|
0L, //is_realtime
|
||||||
1L, //is_overshadowed
|
1L, //is_overshadowed
|
||||||
expectedCompactionState //is_compacted
|
expectedCompactionState, //is_compacted
|
||||||
|
2L // replication_factor
|
||||||
);
|
);
|
||||||
|
|
||||||
verifyRow(
|
verifyRow(
|
||||||
|
@ -685,7 +691,8 @@ public class SystemSchemaTest extends CalciteTestBase
|
||||||
0L, //is_available
|
0L, //is_available
|
||||||
0L, //is_realtime
|
0L, //is_realtime
|
||||||
0L, //is_overshadowed
|
0L, //is_overshadowed
|
||||||
expectedCompactionState //is_compacted
|
expectedCompactionState, //is_compacted
|
||||||
|
0L // replication_factor
|
||||||
);
|
);
|
||||||
|
|
||||||
verifyRow(
|
verifyRow(
|
||||||
|
@ -699,7 +706,8 @@ public class SystemSchemaTest extends CalciteTestBase
|
||||||
0L, //is_available
|
0L, //is_available
|
||||||
0L, //is_realtime
|
0L, //is_realtime
|
||||||
0L, //is_overshadowed
|
0L, //is_overshadowed
|
||||||
null //is_compacted
|
null, //is_compacted
|
||||||
|
2L // replication_factor
|
||||||
);
|
);
|
||||||
|
|
||||||
// Verify value types.
|
// Verify value types.
|
||||||
|
@ -717,7 +725,8 @@ public class SystemSchemaTest extends CalciteTestBase
|
||||||
long isAvailable,
|
long isAvailable,
|
||||||
long isRealtime,
|
long isRealtime,
|
||||||
long isOvershadowed,
|
long isOvershadowed,
|
||||||
CompactionState compactionState
|
CompactionState compactionState,
|
||||||
|
long replicationFactor
|
||||||
) throws Exception
|
) throws Exception
|
||||||
{
|
{
|
||||||
Assert.assertEquals(segmentId, row[0].toString());
|
Assert.assertEquals(segmentId, row[0].toString());
|
||||||
|
@ -740,6 +749,7 @@ public class SystemSchemaTest extends CalciteTestBase
|
||||||
} else {
|
} else {
|
||||||
Assert.assertEquals(mapper.writeValueAsString(compactionState), row[17]);
|
Assert.assertEquals(mapper.writeValueAsString(compactionState), row[17]);
|
||||||
}
|
}
|
||||||
|
Assert.assertEquals(replicationFactor, row[18]);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -644,6 +644,7 @@ num_segments
|
||||||
partition_num
|
partition_num
|
||||||
plaintext_port
|
plaintext_port
|
||||||
queue_insertion_time
|
queue_insertion_time
|
||||||
|
replication_factor
|
||||||
runner_status
|
runner_status
|
||||||
segment_id
|
segment_id
|
||||||
server_type
|
server_type
|
||||||
|
|
Loading…
Reference in New Issue