Use interval comparator instead of bucketMonthComparator

fix when two segments have same interval

review comments
This commit is contained in:
Nishant 2016-01-13 21:30:41 +05:30
parent fc32e34e1e
commit fd6bf3fe22
6 changed files with 89 additions and 11 deletions

View File

@ -65,7 +65,7 @@
</scm>
<properties>
<metamx.java-util.version>0.27.6</metamx.java-util.version>
<metamx.java-util.version>0.27.7</metamx.java-util.version>
<apache.curator.version>2.9.1</apache.curator.version>
<jetty.version>9.2.5.v20141112</jetty.version>
<jersey.version>1.19</jersey.version>

View File

@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.IAE;
@ -78,8 +79,10 @@ import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -94,6 +97,20 @@ import java.util.concurrent.atomic.AtomicReference;
public class DruidCoordinator
{
public static final String COORDINATOR_OWNER_NODE = "_COORDINATOR";
public static Comparator<DataSegment> SEGMENT_COMPARATOR = Ordering.from(Comparators.intervalsByEndThenStart())
.onResultOf(
new Function<DataSegment, Interval>()
{
@Override
public Interval apply(DataSegment segment)
{
return segment.getInterval();
}
})
.compound(Ordering.<DataSegment>natural())
.reverse();
private static final EmittingLogger log = new EmittingLogger(DruidCoordinator.class);
private final Object lock = new Object();
private final DruidCoordinatorConfig config;
@ -249,7 +266,8 @@ public class DruidCoordinator
return retVal;
}
CountingMap<String> getLoadPendingDatasources() {
CountingMap<String> getLoadPendingDatasources()
{
final CountingMap<String> retVal = new CountingMap<>();
for (LoadQueuePeon peon : loadManagementPeons.values()) {
for (DataSegment segment : peon.getSegmentsToLoad()) {
@ -386,7 +404,7 @@ public class DruidCoordinator
public void execute()
{
try {
if (curator.checkExists().forPath(toServedSegPath) != null &&
if (curator.checkExists().forPath(toServedSegPath) != null &&
curator.checkExists().forPath(toLoadQueueSegPath) == null &&
!dropPeon.getSegmentsToDrop().contains(segment)) {
dropPeon.dropSegment(segment, callback);
@ -411,7 +429,7 @@ public class DruidCoordinator
public Set<DataSegment> getOrderedAvailableDataSegments()
{
Set<DataSegment> availableSegments = Sets.newTreeSet(Comparators.inverse(DataSegment.bucketMonthComparator()));
Set<DataSegment> availableSegments = Sets.newTreeSet(SEGMENT_COMPARATOR);
Iterable<DataSegment> dataSegments = getAvailableDataSegments();

View File

@ -199,7 +199,7 @@ public class DruidCoordinatorRuntimeParams
this.databaseRuleManager = null;
this.segmentReplicantLookup = null;
this.dataSources = Sets.newHashSet();
this.availableSegments = Sets.newTreeSet(Comparators.inverse(DataSegment.bucketMonthComparator()));
this.availableSegments = Sets.newTreeSet(DruidCoordinator.SEGMENT_COMPARATOR);
this.loadManagementPeons = Maps.newHashMap();
this.replicationManager = null;
this.emitter = null;

View File

@ -57,8 +57,6 @@ public class LoadQueuePeon
private static final int DROP = 0;
private static final int LOAD = 1;
private static Comparator<DataSegment> segmentComparator = Comparators.inverse(DataSegment.bucketMonthComparator());
private static void executeCallbacks(List<LoadPeonCallback> callbacks)
{
for (LoadPeonCallback callback : callbacks) {
@ -79,10 +77,10 @@ public class LoadQueuePeon
private final AtomicInteger failedAssignCount = new AtomicInteger(0);
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap<>(
segmentComparator
DruidCoordinator.SEGMENT_COMPARATOR
);
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>(
segmentComparator
DruidCoordinator.SEGMENT_COMPARATOR
);
private final Object lock = new Object();

View File

@ -23,7 +23,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
@ -61,6 +63,7 @@ import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
@ -375,4 +378,45 @@ public class DruidCoordinatorTest extends CuratorTestBase
EasyMock.verify(serverInventoryView);
EasyMock.verify(metadataRuleManager);
}
@Test
public void testOrderedAvailableDataSegments()
{
DruidDataSource dataSource = new DruidDataSource("test", new HashMap());
DataSegment[] segments = new DataSegment[]{
getSegment("test", new Interval("2016-01-10T03:00:00Z/2016-01-10T04:00:00Z")),
getSegment("test", new Interval("2016-01-11T01:00:00Z/2016-01-11T02:00:00Z")),
getSegment("test", new Interval("2016-01-09T10:00:00Z/2016-01-09T11:00:00Z")),
getSegment("test", new Interval("2016-01-09T10:00:00Z/2016-01-09T12:00:00Z"))
};
for (DataSegment segment : segments) {
dataSource.addSegment(segment.getIdentifier(), segment);
}
EasyMock.expect(databaseSegmentManager.getInventory()).andReturn(
ImmutableList.of(dataSource)
).atLeastOnce();
EasyMock.replay(databaseSegmentManager);
Set<DataSegment> availableSegments = coordinator.getOrderedAvailableDataSegments();
DataSegment[] expected = new DataSegment[]{
getSegment("test", new Interval("2016-01-11T01:00:00Z/2016-01-11T02:00:00Z")),
getSegment("test", new Interval("2016-01-10T03:00:00Z/2016-01-10T04:00:00Z")),
getSegment("test", new Interval("2016-01-09T10:00:00Z/2016-01-09T12:00:00Z")),
getSegment("test", new Interval("2016-01-09T10:00:00Z/2016-01-09T11:00:00Z"))
};
Assert.assertEquals(expected.length, availableSegments.size());
Assert.assertEquals(expected, availableSegments.toArray());
EasyMock.verify(databaseSegmentManager);
}
private DataSegment getSegment(String dataSource, Interval interval)
{
// Not using EasyMock as it hampers the performance of multithreads.
DataSegment segment = new DataSegment(
dataSource, interval, "dummy_version", Maps.<String, Object>newConcurrentMap(),
Lists.<String>newArrayList(), Lists.<String>newArrayList(), null, 0, 0L
);
return segment;
}
}

View File

@ -137,9 +137,27 @@ public class LoadQueuePeonTest extends CuratorTestBase
final List<DataSegment> segmentToLoad = Lists.transform(
ImmutableList.<String>of(
"2014-10-27T00:00:00Z/P1D",
"2014-10-29T00:00:00Z/P1M",
"2014-10-31T00:00:00Z/P1D",
"2014-10-30T00:00:00Z/P1D",
"2014-10-28T00:00:00Z/P1D"
), new Function<String, DataSegment>()
{
@Override
public DataSegment apply(String intervalStr)
{
return dataSegmentWithInterval(intervalStr);
}
}
);
// segment with latest interval should be loaded first
final List<DataSegment> expectedLoadOrder = Lists.transform(
ImmutableList.<String>of(
"2014-10-29T00:00:00Z/P1M",
"2014-10-31T00:00:00Z/P1D",
"2014-10-30T00:00:00Z/P1D",
"2014-10-29T00:00:00Z/P1D",
"2014-10-28T00:00:00Z/P1D",
"2014-10-27T00:00:00Z/P1D"
), new Function<String, DataSegment>()
@ -235,7 +253,7 @@ public class LoadQueuePeonTest extends CuratorTestBase
}
}
for (DataSegment segment : segmentToLoad) {
for (DataSegment segment : expectedLoadOrder) {
String loadRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getIdentifier());
Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestSignal[requestSignalIdx.get()]));
Assert.assertNotNull(curator.checkExists().forPath(loadRequestPath));