mirror of https://github.com/apache/druid.git
segment interface refactor for proposal 2965 (#2990)
This commit is contained in:
parent
a004f1e1c5
commit
e662efa79f
|
@ -0,0 +1,33 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.druid.segment;
|
||||||
|
|
||||||
|
public abstract class AbstractSegment implements Segment
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public <T> T as(Class<T> clazz)
|
||||||
|
{
|
||||||
|
if (clazz.equals(QueryableIndex.class)) {
|
||||||
|
return (T) asQueryableIndex();
|
||||||
|
} else if (clazz.equals(StorageAdapter.class)) {
|
||||||
|
return (T) asStorageAdapter();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -21,13 +21,14 @@ package io.druid.segment;
|
||||||
|
|
||||||
import io.druid.segment.incremental.IncrementalIndex;
|
import io.druid.segment.incremental.IncrementalIndex;
|
||||||
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
|
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
|
||||||
|
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class IncrementalIndexSegment implements Segment
|
public class IncrementalIndexSegment extends AbstractSegment
|
||||||
{
|
{
|
||||||
private final IncrementalIndex index;
|
private final IncrementalIndex index;
|
||||||
private final String segmentIdentifier;
|
private final String segmentIdentifier;
|
||||||
|
|
|
@ -25,7 +25,7 @@ import java.io.IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class QueryableIndexSegment implements Segment
|
public class QueryableIndexSegment extends AbstractSegment
|
||||||
{
|
{
|
||||||
private final QueryableIndex index;
|
private final QueryableIndex index;
|
||||||
private final String identifier;
|
private final String identifier;
|
||||||
|
|
|
@ -20,13 +20,14 @@
|
||||||
package io.druid.segment;
|
package io.druid.segment;
|
||||||
|
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
|
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
public class ReferenceCountingSegment implements Segment
|
public class ReferenceCountingSegment extends AbstractSegment
|
||||||
{
|
{
|
||||||
private static final EmittingLogger log = new EmittingLogger(ReferenceCountingSegment.class);
|
private static final EmittingLogger log = new EmittingLogger(ReferenceCountingSegment.class);
|
||||||
|
|
||||||
|
|
|
@ -31,4 +31,23 @@ public interface Segment extends Closeable
|
||||||
public Interval getDataInterval();
|
public Interval getDataInterval();
|
||||||
public QueryableIndex asQueryableIndex();
|
public QueryableIndex asQueryableIndex();
|
||||||
public StorageAdapter asStorageAdapter();
|
public StorageAdapter asStorageAdapter();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Request an implementation of a particular interface.
|
||||||
|
*
|
||||||
|
* If the passed-in interface is {@link QueryableIndex} or {@link StorageAdapter}, then this method behaves
|
||||||
|
* identically to {@link #asQueryableIndex()} or {@link #asStorageAdapter()}. Other interfaces are only
|
||||||
|
* expected to be requested by callers that have specific knowledge of extra features provided by specific
|
||||||
|
* segment types. For example, an extension might provide a custom Segment type that can offer both
|
||||||
|
* StorageAdapter and some new interface. That extension can also offer a Query that uses that new interface.
|
||||||
|
*
|
||||||
|
* Implementations which accept classes other than {@link QueryableIndex} or {@link StorageAdapter} are limited
|
||||||
|
* to using those classes within the extension. This means that one extension cannot rely on the `Segment.as`
|
||||||
|
* behavior of another extension.
|
||||||
|
*
|
||||||
|
* @param clazz desired interface
|
||||||
|
* @param <T> desired interface
|
||||||
|
* @return instance of clazz, or null if the interface is not supported by this segment
|
||||||
|
*/
|
||||||
|
public <T> T as(Class<T> clazz);
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ import com.google.common.collect.Maps;
|
||||||
import com.google.common.io.Files;
|
import com.google.common.io.Files;
|
||||||
import com.metamx.common.guava.Sequence;
|
import com.metamx.common.guava.Sequence;
|
||||||
import com.metamx.common.guava.Sequences;
|
import com.metamx.common.guava.Sequences;
|
||||||
|
|
||||||
import io.druid.data.input.Row;
|
import io.druid.data.input.Row;
|
||||||
import io.druid.data.input.impl.CSVParseSpec;
|
import io.druid.data.input.impl.CSVParseSpec;
|
||||||
import io.druid.data.input.impl.DimensionsSpec;
|
import io.druid.data.input.impl.DimensionsSpec;
|
||||||
|
@ -55,9 +56,11 @@ import io.druid.segment.IncrementalIndexSegment;
|
||||||
import io.druid.segment.IndexSpec;
|
import io.druid.segment.IndexSpec;
|
||||||
import io.druid.segment.QueryableIndex;
|
import io.druid.segment.QueryableIndex;
|
||||||
import io.druid.segment.QueryableIndexSegment;
|
import io.druid.segment.QueryableIndexSegment;
|
||||||
|
import io.druid.segment.Segment;
|
||||||
import io.druid.segment.TestHelper;
|
import io.druid.segment.TestHelper;
|
||||||
import io.druid.segment.incremental.IncrementalIndex;
|
import io.druid.segment.incremental.IncrementalIndex;
|
||||||
import io.druid.segment.incremental.OnheapIncrementalIndex;
|
import io.druid.segment.incremental.OnheapIncrementalIndex;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
@ -150,7 +153,7 @@ public class MultiValuedDimensionTest
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
Sequence<Row> result = helper.runQueryOnSegmentsObjs(
|
Sequence<Row> result = helper.runQueryOnSegmentsObjs(
|
||||||
ImmutableList.of(
|
ImmutableList.<Segment>of(
|
||||||
new QueryableIndexSegment("sid1", queryableIndex),
|
new QueryableIndexSegment("sid1", queryableIndex),
|
||||||
new IncrementalIndexSegment(incrementalIndex, "sid2")
|
new IncrementalIndexSegment(incrementalIndex, "sid2")
|
||||||
),
|
),
|
||||||
|
@ -170,7 +173,7 @@ public class MultiValuedDimensionTest
|
||||||
TestHelper.assertExpectedObjects(expectedResults, Sequences.toList(result, new ArrayList<Row>()), "");
|
TestHelper.assertExpectedObjects(expectedResults, Sequences.toList(result, new ArrayList<Row>()), "");
|
||||||
|
|
||||||
result = helper.runQueryOnSegmentsObjs(
|
result = helper.runQueryOnSegmentsObjs(
|
||||||
ImmutableList.of(
|
ImmutableList.<Segment>of(
|
||||||
new QueryableIndexSegment("sid1", queryableIndex),
|
new QueryableIndexSegment("sid1", queryableIndex),
|
||||||
new IncrementalIndexSegment(incrementalIndex, "sid2")
|
new IncrementalIndexSegment(incrementalIndex, "sid2")
|
||||||
),
|
),
|
||||||
|
@ -201,7 +204,7 @@ public class MultiValuedDimensionTest
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
Sequence<Row> result = helper.runQueryOnSegmentsObjs(
|
Sequence<Row> result = helper.runQueryOnSegmentsObjs(
|
||||||
ImmutableList.of(
|
ImmutableList.<Segment>of(
|
||||||
new QueryableIndexSegment("sid1", queryableIndex),
|
new QueryableIndexSegment("sid1", queryableIndex),
|
||||||
new IncrementalIndexSegment(incrementalIndex, "sid2")
|
new IncrementalIndexSegment(incrementalIndex, "sid2")
|
||||||
),
|
),
|
||||||
|
@ -249,7 +252,7 @@ public class MultiValuedDimensionTest
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
Sequence<Row> result = helper.runQueryOnSegmentsObjs(
|
Sequence<Row> result = helper.runQueryOnSegmentsObjs(
|
||||||
ImmutableList.of(
|
ImmutableList.<Segment>of(
|
||||||
new QueryableIndexSegment("sid1", queryableIndex),
|
new QueryableIndexSegment("sid1", queryableIndex),
|
||||||
new IncrementalIndexSegment(incrementalIndex, "sid2")
|
new IncrementalIndexSegment(incrementalIndex, "sid2")
|
||||||
),
|
),
|
||||||
|
|
|
@ -41,6 +41,7 @@ import com.metamx.common.guava.Sequence;
|
||||||
import com.metamx.common.guava.Sequences;
|
import com.metamx.common.guava.Sequences;
|
||||||
import com.metamx.common.guava.Yielder;
|
import com.metamx.common.guava.Yielder;
|
||||||
import com.metamx.common.guava.YieldingAccumulator;
|
import com.metamx.common.guava.YieldingAccumulator;
|
||||||
|
|
||||||
import io.druid.collections.StupidPool;
|
import io.druid.collections.StupidPool;
|
||||||
import io.druid.data.input.Row;
|
import io.druid.data.input.Row;
|
||||||
import io.druid.data.input.impl.InputRowParser;
|
import io.druid.data.input.impl.InputRowParser;
|
||||||
|
@ -63,6 +64,7 @@ import io.druid.query.groupby.GroupByQueryRunnerFactory;
|
||||||
import io.druid.query.select.SelectQueryEngine;
|
import io.druid.query.select.SelectQueryEngine;
|
||||||
import io.druid.query.select.SelectQueryQueryToolChest;
|
import io.druid.query.select.SelectQueryQueryToolChest;
|
||||||
import io.druid.query.select.SelectQueryRunnerFactory;
|
import io.druid.query.select.SelectQueryRunnerFactory;
|
||||||
|
import io.druid.segment.AbstractSegment;
|
||||||
import io.druid.segment.IndexIO;
|
import io.druid.segment.IndexIO;
|
||||||
import io.druid.segment.IndexMerger;
|
import io.druid.segment.IndexMerger;
|
||||||
import io.druid.segment.IndexSpec;
|
import io.druid.segment.IndexSpec;
|
||||||
|
@ -72,6 +74,7 @@ import io.druid.segment.Segment;
|
||||||
import io.druid.segment.column.ColumnConfig;
|
import io.druid.segment.column.ColumnConfig;
|
||||||
import io.druid.segment.incremental.IncrementalIndex;
|
import io.druid.segment.incremental.IncrementalIndex;
|
||||||
import io.druid.segment.incremental.OnheapIncrementalIndex;
|
import io.druid.segment.incremental.OnheapIncrementalIndex;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.io.LineIterator;
|
import org.apache.commons.io.LineIterator;
|
||||||
import org.junit.rules.TemporaryFolder;
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
|
@ -43,7 +43,7 @@ public class ReferenceCountingSegmentTest
|
||||||
public void setUp() throws Exception
|
public void setUp() throws Exception
|
||||||
{
|
{
|
||||||
segment = new ReferenceCountingSegment(
|
segment = new ReferenceCountingSegment(
|
||||||
new Segment()
|
new AbstractSegment()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public String getIdentifier()
|
public String getIdentifier()
|
||||||
|
|
|
@ -20,10 +20,13 @@
|
||||||
package io.druid.segment.loading;
|
package io.druid.segment.loading;
|
||||||
|
|
||||||
import com.metamx.common.MapUtils;
|
import com.metamx.common.MapUtils;
|
||||||
|
|
||||||
|
import io.druid.segment.AbstractSegment;
|
||||||
import io.druid.segment.QueryableIndex;
|
import io.druid.segment.QueryableIndex;
|
||||||
import io.druid.segment.Segment;
|
import io.druid.segment.Segment;
|
||||||
import io.druid.segment.StorageAdapter;
|
import io.druid.segment.StorageAdapter;
|
||||||
import io.druid.timeline.DataSegment;
|
import io.druid.timeline.DataSegment;
|
||||||
|
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -49,7 +52,7 @@ public class CacheTestSegmentLoader implements SegmentLoader
|
||||||
@Override
|
@Override
|
||||||
public Segment getSegment(final DataSegment segment) throws SegmentLoadingException
|
public Segment getSegment(final DataSegment segment) throws SegmentLoadingException
|
||||||
{
|
{
|
||||||
return new Segment()
|
return new AbstractSegment()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public String getIdentifier()
|
public String getIdentifier()
|
||||||
|
|
|
@ -37,10 +37,11 @@ import com.metamx.common.guava.YieldingAccumulator;
|
||||||
import com.metamx.common.guava.YieldingSequenceBase;
|
import com.metamx.common.guava.YieldingSequenceBase;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||||
|
|
||||||
import io.druid.client.cache.CacheConfig;
|
import io.druid.client.cache.CacheConfig;
|
||||||
import io.druid.client.cache.LocalCacheProvider;
|
import io.druid.client.cache.LocalCacheProvider;
|
||||||
import io.druid.granularity.QueryGranularity;
|
|
||||||
import io.druid.granularity.QueryGranularities;
|
import io.druid.granularity.QueryGranularities;
|
||||||
|
import io.druid.granularity.QueryGranularity;
|
||||||
import io.druid.jackson.DefaultObjectMapper;
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
import io.druid.query.ConcatQueryRunner;
|
import io.druid.query.ConcatQueryRunner;
|
||||||
import io.druid.query.Druids;
|
import io.druid.query.Druids;
|
||||||
|
@ -54,6 +55,7 @@ import io.druid.query.Result;
|
||||||
import io.druid.query.aggregation.MetricManipulationFn;
|
import io.druid.query.aggregation.MetricManipulationFn;
|
||||||
import io.druid.query.search.SearchResultValue;
|
import io.druid.query.search.SearchResultValue;
|
||||||
import io.druid.query.search.search.SearchQuery;
|
import io.druid.query.search.search.SearchQuery;
|
||||||
|
import io.druid.segment.AbstractSegment;
|
||||||
import io.druid.segment.IndexIO;
|
import io.druid.segment.IndexIO;
|
||||||
import io.druid.segment.QueryableIndex;
|
import io.druid.segment.QueryableIndex;
|
||||||
import io.druid.segment.ReferenceCountingSegment;
|
import io.druid.segment.ReferenceCountingSegment;
|
||||||
|
@ -64,6 +66,7 @@ import io.druid.segment.loading.SegmentLoadingException;
|
||||||
import io.druid.server.metrics.NoopServiceEmitter;
|
import io.druid.server.metrics.NoopServiceEmitter;
|
||||||
import io.druid.timeline.DataSegment;
|
import io.druid.timeline.DataSegment;
|
||||||
import io.druid.timeline.partition.NoneShardSpec;
|
import io.druid.timeline.partition.NoneShardSpec;
|
||||||
|
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -592,7 +595,7 @@ public class ServerManagerTest
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class SegmentForTesting implements Segment
|
private static class SegmentForTesting extends AbstractSegment
|
||||||
{
|
{
|
||||||
private final String version;
|
private final String version;
|
||||||
private final Interval interval;
|
private final Interval interval;
|
||||||
|
|
Loading…
Reference in New Issue