diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSlice.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSlice.java index 8cc6cf4ca83..dd59dfebd80 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSlice.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSlice.java @@ -20,10 +20,12 @@ package org.apache.druid.msq.input.table; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.druid.msq.input.InputSlice; +import javax.annotation.Nullable; import java.util.List; import java.util.Objects; @@ -50,7 +52,7 @@ public class SegmentsInputSlice implements InputSlice public SegmentsInputSlice( @JsonProperty("dataSource") String dataSource, @JsonProperty("segments") List descriptors, - @JsonProperty("servedSegments") List servedSegments + @JsonProperty("servedSegments") @Nullable List servedSegments ) { this.dataSource = dataSource; @@ -70,7 +72,9 @@ public class SegmentsInputSlice implements InputSlice return descriptors; } + @Nullable @JsonProperty("servedSegments") + @JsonInclude(JsonInclude.Include.NON_NULL) public List getServedSegments() { return servedSegments; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java index 648bd95c6c3..dcc562ebbf8 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java @@ -57,7 +57,8 @@ public class SegmentsInputSliceReader implements InputSliceReader public int numReadableInputs(InputSlice slice) { final SegmentsInputSlice segmentsInputSlice = (SegmentsInputSlice) slice; - return segmentsInputSlice.getDescriptors().size() + segmentsInputSlice.getServedSegments().size(); + final int servedSegmentsSize = segmentsInputSlice.getServedSegments() == null ? 0 : segmentsInputSlice.getServedSegments().size(); + return segmentsInputSlice.getDescriptors().size() + servedSegmentsSize; } @Override @@ -78,15 +79,19 @@ public class SegmentsInputSliceReader implements InputSliceReader counters.channel(CounterNames.inputChannel(inputNumber)).setTotalFiles(slice.fileCount()) ), ReadableInput::segment); - Iterator dataServerIterator = - Iterators.transform( - dataServerIterator( - segmentsInputSlice.getDataSource(), - segmentsInputSlice.getServedSegments(), - counters.channel(CounterNames.inputChannel(inputNumber)).setTotalFiles(slice.fileCount()) - ), ReadableInput::dataServerQuery); + if (segmentsInputSlice.getServedSegments() == null) { + return ReadableInputs.segments(() -> segmentIterator); + } else { + Iterator dataServerIterator = + Iterators.transform( + dataServerIterator( + segmentsInputSlice.getDataSource(), + segmentsInputSlice.getServedSegments(), + counters.channel(CounterNames.inputChannel(inputNumber)).setTotalFiles(slice.fileCount()) + ), ReadableInput::dataServerQuery); - return ReadableInputs.segments(() -> Iterators.concat(dataServerIterator, segmentIterator)); + return ReadableInputs.segments(() -> Iterators.concat(dataServerIterator, segmentIterator)); + } } private Iterator dataSegmentIterator( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/SegmentsInputSliceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/SegmentsInputSliceTest.java index 4f89f3cf378..da0b5dd9a66 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/SegmentsInputSliceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/SegmentsInputSliceTest.java @@ -78,6 +78,43 @@ public class SegmentsInputSliceTest ); } + @Test + public void testSerde2() throws Exception + { + final ObjectMapper mapper = TestHelper.makeJsonMapper() + .registerModules(new MSQIndexingModule().getJacksonModules()); + + final String sliceString = "{\n" + + " \"type\": \"segments\"," + + " \"dataSource\": \"myds\",\n" + + " \"segments\": [\n" + + " {\n" + + " \"itvl\": \"2000-01-01T00:00:00.000Z/2000-02-01T00:00:00.000Z\",\n" + + " \"ver\": \"1\",\n" + + " \"part\": 0\n" + + " }\n" + + " ]\n" + + "}"; + + final SegmentsInputSlice expectedSlice = new SegmentsInputSlice( + "myds", + ImmutableList.of( + new RichSegmentDescriptor( + Intervals.of("2000/P1M"), + Intervals.of("2000/P1M"), + "1", + 0 + ) + ), + null + ); + + Assert.assertEquals( + expectedSlice, + mapper.readValue(sliceString, InputSlice.class) + ); + } + @Test public void testEquals() {