mirror of
https://github.com/apache/druid.git
synced 2025-02-17 07:25:02 +00:00
Make servedSegments nullable to maintain compatibility (#16034)
* Make servedSegments nullable to maintain compatibility
This commit is contained in:
parent
65c3b4d31a
commit
ddd9da2e09
@ -20,10 +20,12 @@
|
|||||||
package org.apache.druid.msq.input.table;
|
package org.apache.druid.msq.input.table;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||||
import org.apache.druid.msq.input.InputSlice;
|
import org.apache.druid.msq.input.InputSlice;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
@ -50,7 +52,7 @@ public class SegmentsInputSlice implements InputSlice
|
|||||||
public SegmentsInputSlice(
|
public SegmentsInputSlice(
|
||||||
@JsonProperty("dataSource") String dataSource,
|
@JsonProperty("dataSource") String dataSource,
|
||||||
@JsonProperty("segments") List<RichSegmentDescriptor> descriptors,
|
@JsonProperty("segments") List<RichSegmentDescriptor> descriptors,
|
||||||
@JsonProperty("servedSegments") List<DataServerRequestDescriptor> servedSegments
|
@JsonProperty("servedSegments") @Nullable List<DataServerRequestDescriptor> servedSegments
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.dataSource = dataSource;
|
this.dataSource = dataSource;
|
||||||
@ -70,7 +72,9 @@ public class SegmentsInputSlice implements InputSlice
|
|||||||
return descriptors;
|
return descriptors;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
@JsonProperty("servedSegments")
|
@JsonProperty("servedSegments")
|
||||||
|
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||||
public List<DataServerRequestDescriptor> getServedSegments()
|
public List<DataServerRequestDescriptor> getServedSegments()
|
||||||
{
|
{
|
||||||
return servedSegments;
|
return servedSegments;
|
||||||
|
@ -57,7 +57,8 @@ public class SegmentsInputSliceReader implements InputSliceReader
|
|||||||
public int numReadableInputs(InputSlice slice)
|
public int numReadableInputs(InputSlice slice)
|
||||||
{
|
{
|
||||||
final SegmentsInputSlice segmentsInputSlice = (SegmentsInputSlice) slice;
|
final SegmentsInputSlice segmentsInputSlice = (SegmentsInputSlice) slice;
|
||||||
return segmentsInputSlice.getDescriptors().size() + segmentsInputSlice.getServedSegments().size();
|
final int servedSegmentsSize = segmentsInputSlice.getServedSegments() == null ? 0 : segmentsInputSlice.getServedSegments().size();
|
||||||
|
return segmentsInputSlice.getDescriptors().size() + servedSegmentsSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -78,15 +79,19 @@ public class SegmentsInputSliceReader implements InputSliceReader
|
|||||||
counters.channel(CounterNames.inputChannel(inputNumber)).setTotalFiles(slice.fileCount())
|
counters.channel(CounterNames.inputChannel(inputNumber)).setTotalFiles(slice.fileCount())
|
||||||
), ReadableInput::segment);
|
), ReadableInput::segment);
|
||||||
|
|
||||||
Iterator<ReadableInput> dataServerIterator =
|
if (segmentsInputSlice.getServedSegments() == null) {
|
||||||
Iterators.transform(
|
return ReadableInputs.segments(() -> segmentIterator);
|
||||||
dataServerIterator(
|
} else {
|
||||||
segmentsInputSlice.getDataSource(),
|
Iterator<ReadableInput> dataServerIterator =
|
||||||
segmentsInputSlice.getServedSegments(),
|
Iterators.transform(
|
||||||
counters.channel(CounterNames.inputChannel(inputNumber)).setTotalFiles(slice.fileCount())
|
dataServerIterator(
|
||||||
), ReadableInput::dataServerQuery);
|
segmentsInputSlice.getDataSource(),
|
||||||
|
segmentsInputSlice.getServedSegments(),
|
||||||
|
counters.channel(CounterNames.inputChannel(inputNumber)).setTotalFiles(slice.fileCount())
|
||||||
|
), ReadableInput::dataServerQuery);
|
||||||
|
|
||||||
return ReadableInputs.segments(() -> Iterators.concat(dataServerIterator, segmentIterator));
|
return ReadableInputs.segments(() -> Iterators.concat(dataServerIterator, segmentIterator));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Iterator<SegmentWithDescriptor> dataSegmentIterator(
|
private Iterator<SegmentWithDescriptor> dataSegmentIterator(
|
||||||
|
@ -78,6 +78,43 @@ public class SegmentsInputSliceTest
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSerde2() throws Exception
|
||||||
|
{
|
||||||
|
final ObjectMapper mapper = TestHelper.makeJsonMapper()
|
||||||
|
.registerModules(new MSQIndexingModule().getJacksonModules());
|
||||||
|
|
||||||
|
final String sliceString = "{\n"
|
||||||
|
+ " \"type\": \"segments\","
|
||||||
|
+ " \"dataSource\": \"myds\",\n"
|
||||||
|
+ " \"segments\": [\n"
|
||||||
|
+ " {\n"
|
||||||
|
+ " \"itvl\": \"2000-01-01T00:00:00.000Z/2000-02-01T00:00:00.000Z\",\n"
|
||||||
|
+ " \"ver\": \"1\",\n"
|
||||||
|
+ " \"part\": 0\n"
|
||||||
|
+ " }\n"
|
||||||
|
+ " ]\n"
|
||||||
|
+ "}";
|
||||||
|
|
||||||
|
final SegmentsInputSlice expectedSlice = new SegmentsInputSlice(
|
||||||
|
"myds",
|
||||||
|
ImmutableList.of(
|
||||||
|
new RichSegmentDescriptor(
|
||||||
|
Intervals.of("2000/P1M"),
|
||||||
|
Intervals.of("2000/P1M"),
|
||||||
|
"1",
|
||||||
|
0
|
||||||
|
)
|
||||||
|
),
|
||||||
|
null
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
expectedSlice,
|
||||||
|
mapper.readValue(sliceString, InputSlice.class)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEquals()
|
public void testEquals()
|
||||||
{
|
{
|
||||||
|
Loading…
x
Reference in New Issue
Block a user