Fall back to the old coordinator API for checking segment handoff if new one is not supported (#6966)

This commit is contained in:
Jihoon Son 2019-01-31 08:50:46 -08:00 committed by Fangjin Yang
parent 9eaf8f5304
commit e56c598cc1
3 changed files with 257 additions and 4 deletions

View File

@ -21,8 +21,8 @@ package org.apache.druid.client.coordinator;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@ -30,6 +30,10 @@ import org.apache.druid.java.util.http.client.response.FullResponseHolder;
import org.apache.druid.query.SegmentDescriptor;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
public class CoordinatorClient
{
@ -46,7 +50,12 @@ public class CoordinatorClient
this.druidLeaderClient = druidLeaderClient;
}
public boolean isHandOffComplete(String dataSource, SegmentDescriptor descriptor)
/**
* Checks the given segment is handed off or not.
* It can return null if the HTTP call returns 404 which can happen during rolling update.
*/
@Nullable
public Boolean isHandOffComplete(String dataSource, SegmentDescriptor descriptor)
{
try {
FullResponseHolder response = druidLeaderClient.go(
@ -62,6 +71,10 @@ public class CoordinatorClient
)
);
if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
return null;
}
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while fetching serverView status[%s] content[%s]",
@ -74,7 +87,38 @@ public class CoordinatorClient
});
}
catch (Exception e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}
public List<ImmutableSegmentLoadInfo> fetchServerView(String dataSource, Interval interval, boolean incompleteOk)
{
try {
FullResponseHolder response = druidLeaderClient.go(
druidLeaderClient.makeRequest(HttpMethod.GET,
StringUtils.format(
"/druid/coordinator/v1/datasources/%s/intervals/%s/serverview?partial=%s",
dataSource,
interval.toString().replace('/', '_'),
incompleteOk
))
);
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while fetching serverView status[%s] content[%s]",
response.getStatus(),
response.getContent()
);
}
return jsonMapper.readValue(
response.getContent(), new TypeReference<List<ImmutableSegmentLoadInfo>>()
{
}
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -19,13 +19,16 @@
package org.apache.druid.segment.realtime.plumber;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.server.coordination.DruidServerMetadata;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -90,7 +93,19 @@ public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNot
Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry = itr.next();
SegmentDescriptor descriptor = entry.getKey();
try {
if (coordinatorClient.isHandOffComplete(dataSource, descriptor)) {
Boolean handOffComplete = coordinatorClient.isHandOffComplete(dataSource, descriptor);
if (handOffComplete == null) {
log.warn(
"Failed to call the new coordinator API for checking segment handoff. Falling back to the old API"
);
final List<ImmutableSegmentLoadInfo> loadedSegments = coordinatorClient.fetchServerView(
dataSource,
descriptor.getInterval(),
true
);
handOffComplete = isHandOffComplete(loadedSegments, descriptor);
}
if (handOffComplete) {
log.info("Segment Handoff complete for dataSource[%s] Segment[%s]", dataSource, descriptor);
entry.getValue().lhs.execute(entry.getValue().rhs);
itr.remove();
@ -120,6 +135,20 @@ public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNot
}
}
static boolean isHandOffComplete(List<ImmutableSegmentLoadInfo> serverView, SegmentDescriptor descriptor)
{
for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) {
if (segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval())
&& segmentLoadInfo.getSegment().getShardSpec().getPartitionNum()
== descriptor.getPartitionNumber()
&& segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0
&& segmentLoadInfo.getServers().stream().anyMatch(DruidServerMetadata::segmentReplicatable)) {
return true;
}
}
return false;
}
@Override
public void close()
{

View File

@ -19,16 +19,23 @@
package org.apache.druid.segment.realtime.plumber;
import com.google.common.collect.Sets;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
public class CoordinatorBasedSegmentHandoffNotifierTest
@ -104,4 +111,177 @@ public class CoordinatorBasedSegmentHandoffNotifierTest
Assert.assertTrue(callbackCalled.get());
EasyMock.verify(coordinatorClient);
}
@Test
public void testHandoffChecksForVersion()
{
Interval interval = Intervals.of(
"2011-04-01/2011-04-02"
);
Assert.assertFalse(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 2),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v2", 2)
)
);
Assert.assertTrue(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v2", 2),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 2)
)
);
Assert.assertTrue(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 2),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 2)
)
);
}
@Test
public void testHandoffChecksForAssignableServer()
{
Interval interval = Intervals.of(
"2011-04-01/2011-04-02"
);
Assert.assertTrue(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 2),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 2)
)
);
Assert.assertFalse(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 2),
Sets.newHashSet(createRealtimeServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 2)
)
);
}
@Test
public void testHandoffChecksForPartitionNumber()
{
Interval interval = Intervals.of(
"2011-04-01/2011-04-02"
);
Assert.assertTrue(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 1),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 1)
)
);
Assert.assertFalse(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 1),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 2)
)
);
}
@Test
public void testHandoffChecksForInterval()
{
Assert.assertFalse(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(Intervals.of("2011-04-01/2011-04-02"), "v1", 1),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(Intervals.of("2011-04-01/2011-04-03"), "v1", 1)
)
);
Assert.assertTrue(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(Intervals.of("2011-04-01/2011-04-04"), "v1", 1),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(Intervals.of("2011-04-02/2011-04-03"), "v1", 1)
)
);
}
private DruidServerMetadata createRealtimeServerMetadata(String name)
{
return createServerMetadata(name, ServerType.REALTIME);
}
private DruidServerMetadata createHistoricalServerMetadata(String name)
{
return createServerMetadata(name, ServerType.HISTORICAL);
}
private DruidServerMetadata createServerMetadata(String name, ServerType type)
{
return new DruidServerMetadata(
name,
name,
null,
10000,
type,
"tier",
1
);
}
private DataSegment createSegment(Interval interval, String version, int partitionNumber)
{
return new DataSegment(
"test_ds",
interval,
version,
null,
null,
null,
new NumberedShardSpec(partitionNumber, 100),
0, 0
);
}
}