mirror of https://github.com/apache/druid.git
ensure the cleaning of overshadowed unloaded segments (#3048)
* ensure the cleaning of overshadowed unloaded segments * add testing plus comments
This commit is contained in:
parent
767190d5db
commit
545cdd63ab
|
@ -28,6 +28,7 @@ import io.druid.server.coordinator.CoordinatorStats;
|
||||||
import io.druid.server.coordinator.DruidCluster;
|
import io.druid.server.coordinator.DruidCluster;
|
||||||
import io.druid.server.coordinator.DruidCoordinator;
|
import io.druid.server.coordinator.DruidCoordinator;
|
||||||
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
|
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
|
||||||
|
import io.druid.server.coordinator.LoadQueuePeon;
|
||||||
import io.druid.server.coordinator.ServerHolder;
|
import io.druid.server.coordinator.ServerHolder;
|
||||||
import io.druid.timeline.DataSegment;
|
import io.druid.timeline.DataSegment;
|
||||||
import io.druid.timeline.TimelineObjectHolder;
|
import io.druid.timeline.TimelineObjectHolder;
|
||||||
|
@ -82,9 +83,37 @@ public class DruidCoordinatorCleanupOvershadowed implements DruidCoordinatorHelp
|
||||||
stats.addToGlobalStat("overShadowedCount", 1);
|
stats.addToGlobalStat("overShadowedCount", 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (LoadQueuePeon loadQueue : coordinator.getLoadManagementPeons().values()) {
|
||||||
|
for (DataSegment dataSegment : loadQueue.getSegmentsToLoad()) {
|
||||||
|
timeline = timelines.get(dataSegment.getDataSource());
|
||||||
|
if (timeline == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// Temporarily add queued segments to the timeline to see if they still need to be loaded.
|
||||||
|
timeline.add(
|
||||||
|
dataSegment.getInterval(),
|
||||||
|
dataSegment.getVersion(),
|
||||||
|
dataSegment.getShardSpec().createChunk(dataSegment)
|
||||||
|
);
|
||||||
|
for (TimelineObjectHolder<String, DataSegment> holder : timeline.findOvershadowed()) {
|
||||||
|
for (DataSegment segmentToRemove : holder.getObject().payloads()) {
|
||||||
|
if (segmentToRemove == dataSegment) {
|
||||||
|
coordinator.removeSegment(dataSegment);
|
||||||
|
stats.addToGlobalStat("overShadowedCount", 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Removing it to make sure that if two segment to load and they overshadow both get loaded.
|
||||||
|
timeline.remove(
|
||||||
|
dataSegment.getInterval(),
|
||||||
|
dataSegment.getVersion(),
|
||||||
|
dataSegment.getShardSpec().createChunk(dataSegment)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return params.buildFromExisting()
|
return params.buildFromExisting()
|
||||||
.withCoordinatorStats(stats)
|
.withCoordinatorStats(stats)
|
||||||
.build();
|
.build();
|
||||||
|
|
|
@ -0,0 +1,93 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.server.coordinator.helper;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import com.google.common.collect.MinMaxPriorityQueue;
|
||||||
|
import com.google.common.collect.Ordering;
|
||||||
|
import io.druid.client.ImmutableDruidDataSource;
|
||||||
|
import io.druid.client.ImmutableDruidServer;
|
||||||
|
import io.druid.server.coordinator.CoordinatorStats;
|
||||||
|
import io.druid.server.coordinator.DruidCluster;
|
||||||
|
import io.druid.server.coordinator.DruidCoordinator;
|
||||||
|
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
|
||||||
|
import io.druid.server.coordinator.LoadQueuePeon;
|
||||||
|
import io.druid.server.coordinator.ServerHolder;
|
||||||
|
import io.druid.timeline.DataSegment;
|
||||||
|
import org.easymock.EasyMock;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class DruidCoordinatorCleanupOvershadowedTest
|
||||||
|
{
|
||||||
|
DruidCoordinatorCleanupOvershadowed druidCoordinatorCleanupOvershadowed;
|
||||||
|
DruidCoordinator coordinator = EasyMock.createStrictMock(DruidCoordinator.class);
|
||||||
|
private List<DataSegment> availableSegments;
|
||||||
|
DateTime start = new DateTime("2012-01-01");
|
||||||
|
DruidCluster druidCluster;
|
||||||
|
private LoadQueuePeon mockPeon = EasyMock.createMock(LoadQueuePeon.class);
|
||||||
|
private ImmutableDruidServer druidServer = EasyMock.createMock(ImmutableDruidServer.class);
|
||||||
|
private ImmutableDruidDataSource druidDataSource = EasyMock.createMock(ImmutableDruidDataSource.class);
|
||||||
|
private DataSegment segmentV0 = new DataSegment.Builder().dataSource("test")
|
||||||
|
.interval(new Interval(start, start.plusHours(1)))
|
||||||
|
.version("0")
|
||||||
|
.build();
|
||||||
|
private DataSegment segmentV1 = new DataSegment.Builder().dataSource("test")
|
||||||
|
.interval(new Interval(start, start.plusHours(1)))
|
||||||
|
.version("1")
|
||||||
|
.build();
|
||||||
|
@Test
|
||||||
|
public void testRun()
|
||||||
|
{
|
||||||
|
druidCoordinatorCleanupOvershadowed = new DruidCoordinatorCleanupOvershadowed(coordinator);
|
||||||
|
availableSegments = ImmutableList.of(segmentV1, segmentV0);
|
||||||
|
|
||||||
|
druidCluster = new DruidCluster(
|
||||||
|
ImmutableMap.of("normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(Arrays.asList(
|
||||||
|
new ServerHolder(druidServer, mockPeon
|
||||||
|
)))));
|
||||||
|
|
||||||
|
EasyMock.expect(druidServer.getDataSources())
|
||||||
|
.andReturn(ImmutableList.of(druidDataSource))
|
||||||
|
.anyTimes();
|
||||||
|
EasyMock.expect(druidDataSource.getSegments()).andReturn(ImmutableSet.<DataSegment>of(segmentV1)).anyTimes();
|
||||||
|
EasyMock.expect(druidDataSource.getName()).andReturn("test").anyTimes();
|
||||||
|
EasyMock.expect(coordinator.getLoadManagementPeons())
|
||||||
|
.andReturn(ImmutableMap.<String, LoadQueuePeon>of("testHost", mockPeon))
|
||||||
|
.anyTimes();
|
||||||
|
coordinator.removeSegment(segmentV0);
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(ImmutableSet.<DataSegment>of(segmentV0)).anyTimes();
|
||||||
|
EasyMock.replay(mockPeon, coordinator, druidServer, druidDataSource);
|
||||||
|
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams.newBuilder()
|
||||||
|
.withAvailableSegments(availableSegments)
|
||||||
|
.withCoordinatorStats(new CoordinatorStats())
|
||||||
|
.withDruidCluster(druidCluster)
|
||||||
|
.build();
|
||||||
|
druidCoordinatorCleanupOvershadowed.run(params);
|
||||||
|
EasyMock.verify(coordinator, druidDataSource, druidServer);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue