mirror of https://github.com/apache/druid.git
Add realization for updating version of derived segments in Materiali… (#8281)
* Add realization for updating version of derived segments in MaterializedView * add unit test, and change code style for the sake of ease of understanding
This commit is contained in:
parent
58fbb69113
commit
31732f0e21
|
@ -63,6 +63,7 @@ import java.util.Set;
|
|||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.IntSupplier;
|
||||
|
||||
public class MaterializedViewSupervisor implements Supervisor
|
||||
{
|
||||
|
@ -368,6 +369,20 @@ public class MaterializedViewSupervisor implements Supervisor
|
|||
MapDifference<Interval, String> difference = Maps.difference(maxCreatedDate, derivativeVersion);
|
||||
Map<Interval, String> toBuildInterval = new HashMap<>(difference.entriesOnlyOnLeft());
|
||||
Map<Interval, String> toDropInterval = new HashMap<>(difference.entriesOnlyOnRight());
|
||||
// update version of derived segments if isn't the max (created_date) of all base segments
|
||||
// prevent user supplied segments list did not match with segments list obtained from db
|
||||
Map<Interval, MapDifference.ValueDifference<String>> checkIfNewestVersion =
|
||||
new HashMap<>(difference.entriesDiffering());
|
||||
for (Map.Entry<Interval, MapDifference.ValueDifference<String>> entry : checkIfNewestVersion.entrySet()) {
|
||||
final String versionOfBase = maxCreatedDate.get(entry.getKey());
|
||||
final String versionOfDerivative = derivativeVersion.get(entry.getKey());
|
||||
final int baseCount = baseSegments.get(entry.getKey()).size();
|
||||
final IntSupplier usedCountSupplier = () ->
|
||||
metadataStorageCoordinator.getUsedSegmentsForInterval(spec.getBaseDataSource(), entry.getKey()).size();
|
||||
if (versionOfBase.compareTo(versionOfDerivative) > 0 && baseCount == usedCountSupplier.getAsInt()) {
|
||||
toBuildInterval.put(entry.getKey(), versionOfBase);
|
||||
}
|
||||
}
|
||||
// if some intervals are in running tasks and the versions are the same, remove it from toBuildInterval
|
||||
// if some intervals are in running tasks, but the versions are different, stop the task.
|
||||
for (Map.Entry<Interval, String> version : runningVersion.entrySet()) {
|
||||
|
|
|
@ -82,6 +82,7 @@ public class MaterializedViewSupervisorTest
|
|||
private SQLMetadataSegmentManager sqlMetadataSegmentManager;
|
||||
private TaskQueue taskQueue;
|
||||
private MaterializedViewSupervisor supervisor;
|
||||
private String derivativeDatasourceName;
|
||||
private final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
|
||||
|
||||
@Before
|
||||
|
@ -124,6 +125,7 @@ public class MaterializedViewSupervisorTest
|
|||
EasyMock.createMock(ChatHandlerProvider.class),
|
||||
new SupervisorStateManagerConfig()
|
||||
);
|
||||
derivativeDatasourceName = spec.getDataSourceName();
|
||||
supervisor = (MaterializedViewSupervisor) spec.createSupervisor();
|
||||
}
|
||||
|
||||
|
@ -152,13 +154,50 @@ public class MaterializedViewSupervisorTest
|
|||
new HashBasedNumberedShardSpec(0, 1, null, null),
|
||||
9,
|
||||
1024
|
||||
),
|
||||
new DataSegment(
|
||||
"base",
|
||||
Intervals.of("2015-01-03T00Z/2015-01-04T00Z"),
|
||||
"2015-01-04",
|
||||
ImmutableMap.of(),
|
||||
ImmutableList.of("dim1", "dim2"),
|
||||
ImmutableList.of("m1"),
|
||||
new HashBasedNumberedShardSpec(0, 1, null, null),
|
||||
9,
|
||||
1024
|
||||
)
|
||||
);
|
||||
Set<DataSegment> derivativeSegments = Sets.newHashSet(
|
||||
new DataSegment(
|
||||
derivativeDatasourceName,
|
||||
Intervals.of("2015-01-01T00Z/2015-01-02T00Z"),
|
||||
"2015-01-02",
|
||||
ImmutableMap.of(),
|
||||
ImmutableList.of("dim1", "dim2"),
|
||||
ImmutableList.of("m1"),
|
||||
new HashBasedNumberedShardSpec(0, 1, null, null),
|
||||
9,
|
||||
1024
|
||||
),
|
||||
new DataSegment(
|
||||
derivativeDatasourceName,
|
||||
Intervals.of("2015-01-02T00Z/2015-01-03T00Z"),
|
||||
"3015-01-01",
|
||||
ImmutableMap.of(),
|
||||
ImmutableList.of("dim1", "dim2"),
|
||||
ImmutableList.of("m1"),
|
||||
new HashBasedNumberedShardSpec(0, 1, null, null),
|
||||
9,
|
||||
1024
|
||||
)
|
||||
);
|
||||
indexerMetadataStorageCoordinator.announceHistoricalSegments(baseSegments);
|
||||
indexerMetadataStorageCoordinator.announceHistoricalSegments(derivativeSegments);
|
||||
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
|
||||
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
|
||||
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
|
||||
Pair<SortedMap<Interval, String>, Map<Interval, List<DataSegment>>> toBuildInterval = supervisor.checkSegments();
|
||||
Set<Interval> expectedToBuildInterval = Sets.newHashSet(Intervals.of("2015-01-01T00Z/2015-01-02T00Z"));
|
||||
Map<Interval, List<DataSegment>> expectedSegments = new HashMap<>();
|
||||
expectedSegments.put(
|
||||
Intervals.of("2015-01-01T00Z/2015-01-02T00Z"),
|
||||
|
@ -176,6 +215,23 @@ public class MaterializedViewSupervisorTest
|
|||
)
|
||||
)
|
||||
);
|
||||
expectedSegments.put(
|
||||
Intervals.of("2015-01-02T00Z/2015-01-03T00Z"),
|
||||
Collections.singletonList(
|
||||
new DataSegment(
|
||||
"base",
|
||||
Intervals.of("2015-01-02T00Z/2015-01-03T00Z"),
|
||||
"2015-01-03",
|
||||
ImmutableMap.of(),
|
||||
ImmutableList.of("dim1", "dim2"),
|
||||
ImmutableList.of("m1"),
|
||||
new HashBasedNumberedShardSpec(0, 1, null, null),
|
||||
9,
|
||||
1024
|
||||
)
|
||||
)
|
||||
);
|
||||
Assert.assertEquals(expectedToBuildInterval, toBuildInterval.lhs.keySet());
|
||||
Assert.assertEquals(expectedSegments, toBuildInterval.rhs);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue