rejiggering druid api

This commit is contained in:
fjy 2014-02-14 12:57:52 -08:00
parent 0f6af72ea4
commit a8c4362d72
26 changed files with 573 additions and 221 deletions
indexing-common
indexing-hadoop/src/main/java/io/druid/indexer/schema
indexing-service/src/main/java/io/druid/indexing/common/index
kafka-eight/src/main/java/io/druid/firehose/kafka
pom.xml
processing/src
main/java/io/druid/data/input
test/java/io/druid
rabbitmq/src/main/java/io/druid/firehose/rabbitmq
server/src

86
indexing-common/pom.xml Normal file
View File

@ -0,0 +1,86 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright (C) 2012, 2013 Metamarkets Group Inc.
~
~ This program is free software; you can redistribute it and/or
~ modify it under the terms of the GNU General Public License
~ as published by the Free Software Foundation; either version 2
~ of the License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU General Public License for more details.
~
~ You should have received a copy of the GNU General Public License
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid</groupId>
<artifactId>druid-indexing-common</artifactId>
<name>druid-indexing-common</name>
<description>Druid Indexing Common</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<outputFile>
${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar
</outputFile>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,92 @@
package io.druid.indexer.schema;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.segment.indexing.DriverConfig;
/**
*/
@JsonTypeName("batch")
public class BatchDriverConfig implements DriverConfig
{
private final String workingPath;
private final String segmentOutputPath;
private final String version;
private final PartitionsSpec partitionsSpec;
private final boolean leaveIntermediate;
private final boolean cleanupOnFailure;
private final boolean overwriteFiles;
private final boolean ignoreInvalidRows;
@JsonCreator
public BatchDriverConfig(
final @JsonProperty("workingPath") String workingPath,
final @JsonProperty("segmentOutputPath") String segmentOutputPath,
final @JsonProperty("version") String version,
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows
)
{
this.workingPath = workingPath;
this.segmentOutputPath = segmentOutputPath;
this.version = version;
this.partitionsSpec = partitionsSpec;
this.leaveIntermediate = leaveIntermediate;
this.cleanupOnFailure = (cleanupOnFailure == null) ? true : cleanupOnFailure;
this.overwriteFiles = overwriteFiles;
this.ignoreInvalidRows = ignoreInvalidRows;
}
@JsonProperty
public String getWorkingPath()
{
return workingPath;
}
@JsonProperty
public String getSegmentOutputPath()
{
return segmentOutputPath;
}
@JsonProperty
public String getVersion()
{
return version;
}
@JsonProperty
public PartitionsSpec getPartitionsSpec()
{
return partitionsSpec;
}
@JsonProperty
public boolean isLeaveIntermediate()
{
return leaveIntermediate;
}
@JsonProperty
public boolean isCleanupOnFailure()
{
return cleanupOnFailure;
}
@JsonProperty
public boolean isOverwriteFiles()
{
return overwriteFiles;
}
@JsonProperty
public boolean isIgnoreInvalidRows()
{
return ignoreInvalidRows;
}
}

View File

@ -0,0 +1,39 @@
package io.druid.indexer.schema;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.druid.indexer.path.PathSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.segment.indexing.IOConfig;
/**
*/
@JsonTypeName("batch")
public class BatchIOConfig implements IOConfig
{
private final PathSpec pathSpec;
private final DbUpdaterJobSpec updaterJobSpec;
@JsonCreator
public BatchIOConfig(
@JsonProperty("pathSpec") PathSpec pathSpec,
@JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec
)
{
this.pathSpec = pathSpec;
this.updaterJobSpec = updaterJobSpec;
}
@JsonProperty
public PathSpec getPathSpec()
{
return pathSpec;
}
@JsonProperty
public DbUpdaterJobSpec getUpdaterJobSpec()
{
return updaterJobSpec;
}
}

View File

@ -35,6 +35,7 @@ import io.druid.query.QueryRunner;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentGranularity;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
@ -79,6 +80,12 @@ public class YeOldePlumberSchool implements PlumberSchool
this.tmpSegmentDir = tmpSegmentDir;
}
@Override
public SegmentGranularity getSegmentGranularity()
{
throw new UnsupportedOperationException();
}
@Override
public Plumber findPlumber(final Schema schema, final FireDepartmentMetrics metrics)
{

View File

@ -146,4 +146,10 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory
}
};
}
@Override
public ByteBufferInputRowParser getParser()
{
return parser;
}
}

View File

@ -41,7 +41,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.25.2</metamx.java-util.version>
<apache.curator.version>2.3.0</apache.curator.version>
<druid.api.version>0.1.8</druid.api.version>
<druid.api.version>0.2.0-SNAPSHOT</druid.api.version>
</properties>
<modules>

View File

@ -29,7 +29,11 @@ import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.data.input.impl.TimestampSpec;
import java.io.InputStream;
@ -45,84 +49,104 @@ public class ProtoBufInputRowParser implements ByteBufferInputRowParser
{
private static final Logger log = new Logger(ProtoBufInputRowParser.class);
private final MapInputRowParser inputRowCreator;
private final Descriptor descriptor;
private final ParseSpec parseSpec;
private final MapInputRowParser mapParser;
private final String descriptorFileInClasspath;
@JsonCreator
public ProtoBufInputRowParser(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonCreator
public ProtoBufInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec,
@JsonProperty("descriptor") String descriptorFileInClasspath,
// Backwards compatible
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions,
@JsonProperty("descriptor") String descriptorFileInClasspath)
{
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions
)
{
// Backwards Compatible
if (parseSpec == null) {
this.parseSpec = new ParseSpec(
timestampSpec,
new DimensionsSpec(dimensions, dimensionExclusions, spatialDimensions)
)
{
};
} else {
this.parseSpec = parseSpec;
}
descriptor = getDescriptor(descriptorFileInClasspath);
inputRowCreator = new MapInputRowParser(timestampSpec, dimensions, dimensionExclusions);
this.descriptorFileInClasspath = descriptorFileInClasspath;
this.mapParser = new MapInputRowParser(this.parseSpec, null, null, null, null);
}
}
@Override
public ParseSpec getParseSpec()
{
return parseSpec;
}
@Override
public InputRow parse(ByteBuffer input) throws FormattedException
{
// We should really create a ProtoBufBasedInputRow that does not need an intermediate map but accesses
// the DynamicMessage directly...
Map<String, Object> theMap = buildStringKeyMap(input);
@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
return new ProtoBufInputRowParser(parseSpec, descriptorFileInClasspath, null, null, null, null);
}
return inputRowCreator.parse(theMap);
}
@Override
public InputRow parse(ByteBuffer input) throws FormattedException
{
// We should really create a ProtoBufBasedInputRow that does not need an intermediate map but accesses
// the DynamicMessage directly...
Map<String, Object> theMap = buildStringKeyMap(input);
private Map<String, Object> buildStringKeyMap(ByteBuffer input)
{
Map<String, Object> theMap = Maps.newHashMap();
return mapParser.parse(theMap);
}
try
{
private Map<String, Object> buildStringKeyMap(ByteBuffer input)
{
final Descriptor descriptor = getDescriptor(descriptorFileInClasspath);
final Map<String, Object> theMap = Maps.newHashMap();
try {
DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input));
Map<Descriptors.FieldDescriptor, Object> allFields = message.getAllFields();
Map<Descriptors.FieldDescriptor, Object> allFields = message.getAllFields();
for (Map.Entry<Descriptors.FieldDescriptor, Object> entry : allFields.entrySet())
{
String name = entry.getKey().getName();
if (theMap.containsKey(name))
{
continue;
// Perhaps throw an exception here?
// throw new RuntimeException("dupicate key " + name + " in " + message);
}
for (Map.Entry<Descriptors.FieldDescriptor, Object> entry : allFields.entrySet()) {
String name = entry.getKey().getName();
if (theMap.containsKey(name)) {
continue;
// Perhaps throw an exception here?
// throw new RuntimeException("dupicate key " + name + " in " + message);
}
Object value = entry.getValue();
if(value instanceof Descriptors.EnumValueDescriptor) {
if (value instanceof Descriptors.EnumValueDescriptor) {
Descriptors.EnumValueDescriptor desc = (Descriptors.EnumValueDescriptor) value;
value = desc.getName();
}
theMap.put(name, value);
}
theMap.put(name, value);
}
} catch (InvalidProtocolBufferException e)
{
log.warn(e, "Problem with protobuf something");
}
return theMap;
}
}
catch (InvalidProtocolBufferException e) {
log.warn(e, "Problem with protobuf something");
}
return theMap;
}
private Descriptor getDescriptor(String descriptorFileInClassPath)
{
try
{
InputStream fin = this.getClass().getClassLoader().getResourceAsStream(descriptorFileInClassPath);
FileDescriptorSet set = FileDescriptorSet.parseFrom(fin);
FileDescriptor file = FileDescriptor.buildFrom(set.getFile(0), new FileDescriptor[]
{});
return file.getMessageTypes().get(0);
} catch (Exception e)
{
throw Throwables.propagate(e);
}
}
@Override
public void addDimensionExclusion(String dimension)
{
inputRowCreator.addDimensionExclusion(dimension);
}
private Descriptor getDescriptor(String descriptorFileInClassPath)
{
try {
InputStream fin = this.getClass().getClassLoader().getResourceAsStream(descriptorFileInClassPath);
FileDescriptorSet set = FileDescriptorSet.parseFrom(fin);
FileDescriptor file = FileDescriptor.buildFrom(
set.getFile(0), new FileDescriptor[]
{}
);
return file.getMessageTypes().get(0);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -19,6 +19,8 @@
package io.druid.data.input;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import org.joda.time.DateTime;
import org.junit.Test;
@ -30,7 +32,8 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
public class ProtoBufInputRowParserTest {
public class ProtoBufInputRowParserTest
{
public static final String[] DIMENSIONS = new String[]{"eventType", "id", "someOtherId", "isValid"};
@ -49,26 +52,35 @@ public class ProtoBufInputRowParserTest {
*/
@Test
public void testParse() throws Exception {
public void testParse() throws Exception
{
//configure parser with desc file
ProtoBufInputRowParser parser = new ProtoBufInputRowParser(new TimestampSpec("timestamp", "iso"),
Arrays.asList(DIMENSIONS), Arrays.<String>asList(), "prototest.desc");
ProtoBufInputRowParser parser = new ProtoBufInputRowParser(
new ParseSpec(
new TimestampSpec("timestamp", "iso"),
new DimensionsSpec(Arrays.asList(DIMENSIONS), Arrays.<String>asList(), null)
)
{
},
"prototest.desc",
null, null, null, null
);
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 07, 12, 9, 30);
ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
.setDescription("description")
.setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE)
.setId(4711L)
.setIsValid(true)
.setSomeOtherId(4712)
.setTimestamp(dateTime.toString())
.setSomeFloatColumn(47.11F)
.setSomeIntColumn(815)
.setSomeLongColumn(816L)
.build();
.setDescription("description")
.setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE)
.setId(4711L)
.setIsValid(true)
.setSomeOtherId(4712)
.setTimestamp(dateTime.toString())
.setSomeFloatColumn(47.11F)
.setSomeIntColumn(815)
.setSomeLongColumn(816L)
.build();
ByteArrayOutputStream out = new ByteArrayOutputStream();
event.writeTo(out);
@ -91,7 +103,8 @@ public class ProtoBufInputRowParserTest {
}
private void assertDimensionEquals(InputRow row, String dimension, Object expected) {
private void assertDimensionEquals(InputRow row, String dimension, Object expected)
{
List<String> values = row.getDimension(dimension);
assertEquals(1, values.size());
assertEquals(expected, values.get(0));

View File

@ -25,7 +25,8 @@ import com.google.common.io.CharStreams;
import com.google.common.io.InputSupplier;
import com.google.common.io.LineProcessor;
import com.metamx.common.logger.Logger;
import io.druid.data.input.impl.DelimitedDataSpec;
import io.druid.data.input.impl.DelimitedParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
@ -52,7 +53,14 @@ public class TestIndex
private static QueryableIndex mmappedIndex = null;
private static QueryableIndex mergedRealtime = null;
public static final String[] COLUMNS = new String[]{"ts", "provider", "quALIty", "plAcEmEnT", "pLacementish", "iNdEx"};
public static final String[] COLUMNS = new String[]{
"ts",
"provider",
"quALIty",
"plAcEmEnT",
"pLacementish",
"iNdEx"
};
public static final String[] DIMENSIONS = new String[]{"provider", "quALIty", "plAcEmEnT", "pLacementish"};
public static final String[] METRICS = new String[]{"iNdEx"};
private static final Interval DATA_INTERVAL = new Interval("2011-01-12T00:00:00.000Z/2011-04-16T00:00:00.000Z");
@ -156,9 +164,13 @@ public class TestIndex
new LineProcessor<Integer>()
{
StringInputRowParser parser = new StringInputRowParser(
new TimestampSpec("ts", "iso"),
new DelimitedDataSpec("\t", Arrays.asList(COLUMNS), Arrays.asList(DIMENSIONS), null),
Arrays.<String>asList()
new DelimitedParseSpec(
new TimestampSpec("ts", "iso"),
new DimensionsSpec(Arrays.asList(COLUMNS), Arrays.asList(DIMENSIONS), null),
"\t",
Arrays.<String>asList()
),
null, null, null, null
);
boolean runOnce = false;
int lineCount = 0;

View File

@ -28,6 +28,7 @@ import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
@ -253,4 +254,10 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory
}
};
}
@Override
public ByteBufferInputRowParser getParser()
{
return parser;
}
}

View File

@ -22,24 +22,17 @@ package io.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec;
import java.util.List;
/**
*/
public class DataSchema
{
private final String dataSource;
private final TimestampSpec timestampSpec;
private final List<String> dimensions;
private final List<String> dimensionExclusions;
private final List<SpatialDimensionSchema> spatialDimensions;
private final ByteBufferInputRowParser parser;
private final AggregatorFactory[] aggregators;
private final GranularitySpec granularitySpec;
private final ShardSpec shardSpec;
@ -47,27 +40,19 @@ public class DataSchema
@JsonCreator
public DataSchema(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions,
@JsonProperty("metrics") AggregatorFactory[] aggregators,
@JsonProperty("parser") ByteBufferInputRowParser parser,
@JsonProperty("metricsSpec") AggregatorFactory[] aggregators,
@JsonProperty("granularitySpec") GranularitySpec granularitySpec,
@JsonProperty("shardSpec") ShardSpec shardSpec
)
{
Preconditions.checkNotNull(dataSource, "dataSource");
Preconditions.checkNotNull(timestampSpec, "timestampSpec");
Preconditions.checkNotNull(parser, "parser");
Preconditions.checkNotNull(aggregators, "metrics");
Preconditions.checkNotNull(granularitySpec, "granularitySpec");
this.dataSource = dataSource;
this.timestampSpec = timestampSpec;
this.dimensions = dimensions;
this.dimensionExclusions = dimensionExclusions;
this.spatialDimensions = (spatialDimensions == null)
? Lists.<SpatialDimensionSchema>newArrayList()
: spatialDimensions;
this.parser = parser;
this.aggregators = aggregators;
this.granularitySpec = granularitySpec;
this.shardSpec = shardSpec == null ? new NoneShardSpec() : shardSpec;
@ -80,30 +65,12 @@ public class DataSchema
}
@JsonProperty
public TimestampSpec getTimestampSpec()
public ByteBufferInputRowParser getParser()
{
return timestampSpec;
return parser;
}
@JsonProperty
public List<String> getDimensions()
{
return dimensions;
}
@JsonProperty
public List<String> getDimensionExclusions()
{
return dimensionExclusions;
}
@JsonProperty
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
}
@JsonProperty
@JsonProperty("metricsSpec")
public AggregatorFactory[] getAggregators()
{
return aggregators;

View File

@ -26,7 +26,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "uniform", value = RealtimeDriverConfig.class)
@JsonSubTypes.Type(name = "realtime", value = RealtimeDriverConfig.class)
})
public interface DriverConfig
{

View File

@ -19,9 +19,15 @@
package io.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
/**
*/
public interface IngestConfig
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "realtime", value = RealtimeIOConfig.class)
})
public interface IOConfig
{
}

View File

@ -27,36 +27,36 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class IngestionSchema
{
private final DataSchema dataSchema;
private final IOConfig ioConfig;
private final DriverConfig driverConfig;
private final IngestConfig ingestConfig;
@JsonCreator
public IngestionSchema(
@JsonProperty("schema") DataSchema dataSchema,
@JsonProperty("config") DriverConfig driverConfig,
@JsonProperty("ingest") IngestConfig ingestConfig
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("io") IOConfig ioConfig,
@JsonProperty("driverConfig") DriverConfig driverConfig
)
{
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.driverConfig = driverConfig;
this.ingestConfig = ingestConfig;
}
@JsonProperty
@JsonProperty("dataSchema")
public DataSchema getDataSchema()
{
return dataSchema;
}
@JsonProperty
@JsonProperty("io")
public IOConfig getIoConfig()
{
return ioConfig;
}
@JsonProperty("config")
public DriverConfig getDriverConfig()
{
return driverConfig;
}
@JsonProperty
public IngestConfig getIngestConfig()
{
return ingestConfig;
}
}

View File

@ -21,35 +21,23 @@ package io.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.segment.realtime.plumber.RejectionPolicyFactory;
import org.joda.time.Period;
import java.io.File;
/**
*/
public class RealtimeDriverConfig implements DriverConfig
{
private final int maxRowsInMemory;
private final Period intermediatePersistPeriod;
private final Period windowPeriod;
private final File basePersistDirectory;
private final RejectionPolicyFactory rejectionPolicyFactory;
@JsonCreator
public RealtimeDriverConfig(
@JsonProperty("maxRowsInMemory") int maxRowsInMemory,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("rejectionPolicyFactory") RejectionPolicyFactory rejectionPolicyFactory
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod
)
{
this.maxRowsInMemory = maxRowsInMemory;
this.intermediatePersistPeriod = intermediatePersistPeriod;
this.windowPeriod = windowPeriod;
this.basePersistDirectory = basePersistDirectory;
this.rejectionPolicyFactory = rejectionPolicyFactory;
}
@JsonProperty
@ -63,22 +51,4 @@ public class RealtimeDriverConfig implements DriverConfig
{
return intermediatePersistPeriod;
}
@JsonProperty
public Period getWindowPeriod()
{
return windowPeriod;
}
@JsonProperty
public File getBasePersistDirectory()
{
return basePersistDirectory;
}
@JsonProperty
public RejectionPolicyFactory getRejectionPolicyFactory()
{
return rejectionPolicyFactory;
}
}

View File

@ -0,0 +1,36 @@
package io.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.data.input.FirehoseFactory;
import io.druid.segment.realtime.plumber.PlumberSchool;
/**
*/
public class RealtimeIOConfig implements IOConfig
{
private final FirehoseFactory firehoseFactory;
private final PlumberSchool plumberSchool;
@JsonCreator
public RealtimeIOConfig(
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("plumber") PlumberSchool plumberSchool
)
{
this.firehoseFactory = firehoseFactory;
this.plumberSchool = plumberSchool;
}
@JsonProperty("firehose")
public FirehoseFactory getFirehoseFactory()
{
return firehoseFactory;
}
@JsonProperty("plumber")
public PlumberSchool getPlumberSchool()
{
return plumberSchool;
}
}

View File

@ -21,8 +21,14 @@ package io.druid.segment.realtime;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.GranularitySpec;
import io.druid.segment.indexing.IngestionSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
@ -30,39 +36,67 @@ import java.io.IOException;
/**
* A Fire Department has a Firehose and a Plumber.
*
* <p/>
* This is a metaphor for a realtime stream (Firehose) and a coordinator of sinks (Plumber). The Firehose provides the
* realtime stream of data. The Plumber directs each drop of water from the firehose into the correct sink and makes
* sure that the sinks don't overflow.
*/
public class FireDepartment
public class FireDepartment extends IngestionSchema
{
@JsonProperty("schema")
private final Schema schema;
@JsonProperty
private final FireDepartmentConfig config;
@JsonProperty
private final FirehoseFactory firehoseFactory;
@JsonProperty
private final PlumberSchool plumberSchool;
private final DataSchema dataSchema;
private final RealtimeIOConfig ioConfig;
private final RealtimeDriverConfig driverConfig;
private final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
@JsonCreator
public FireDepartment(
@JsonProperty("schema") Schema schema,
@JsonProperty("config") FireDepartmentConfig config,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("plumber") PlumberSchool plumberSchool
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("io") RealtimeIOConfig ioConfig,
@JsonProperty("driverConfig") RealtimeDriverConfig driverConfig,
// Backwards Compatability
@JsonProperty("schema") Schema schema,
@JsonProperty("config") FireDepartmentConfig config,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("plumber") PlumberSchool plumberSchool
)
{
this.schema = schema;
this.config = config;
this.firehoseFactory = firehoseFactory;
this.plumberSchool = plumberSchool;
super(dataSchema, ioConfig, driverConfig);
// Backwards compatibility
if (dataSchema == null) {
Preconditions.checkNotNull(schema, "schema");
Preconditions.checkNotNull(config, "config");
Preconditions.checkNotNull(firehoseFactory, "firehoseFactory");
Preconditions.checkNotNull(plumberSchool, "plumberSchool");
this.dataSchema = new DataSchema(
schema.getDataSource(),
firehoseFactory.getParser(),
schema.getAggregators(),
new GranularitySpec(
plumberSchool.getSegmentGranularity(),
schema.getIndexGranularity()
),
schema.getShardSpec()
);
this.ioConfig = new RealtimeIOConfig(
firehoseFactory,
plumberSchool
);
this.driverConfig = new RealtimeDriverConfig(
config.getMaxRowsInMemory(),
config.getIntermediatePersistPeriod()
);
} else {
Preconditions.checkNotNull(dataSchema, "dataSchema");
Preconditions.checkNotNull(ioConfig, "ioConfig");
Preconditions.checkNotNull(driverConfig, "driverConfig");
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.driverConfig = driverConfig;
}
}
/**
@ -70,24 +104,24 @@ public class FireDepartment
*
* @return the Schema for this feed.
*/
public Schema getSchema()
public DataSchema getSchema()
{
return schema;
return dataSchema;
}
public FireDepartmentConfig getConfig()
public RealtimeDriverConfig getConfig()
{
return config;
return driverConfig;
}
public Plumber findPlumber()
{
return plumberSchool.findPlumber(schema, metrics);
return ioConfig.getPlumberSchool().findPlumber(dataSchema, metrics);
}
public Firehose connect() throws IOException
{
return firehoseFactory.connect();
return ioConfig.getFirehoseFactory().connect(dataSchema.getParser());
}
public FireDepartmentMetrics getMetrics()

View File

@ -39,6 +39,9 @@ import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IngestionSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.Sink;
import org.joda.time.DateTime;
@ -77,7 +80,7 @@ public class RealtimeManager implements QuerySegmentWalker
public void start() throws IOException
{
for (final FireDepartment fireDepartment : fireDepartments) {
Schema schema = fireDepartment.getSchema();
DataSchema schema = fireDepartment.getSchema();
final FireChief chief = new FireChief(fireDepartment);
chiefs.put(schema.getDataSource(), chief);
@ -126,7 +129,7 @@ public class RealtimeManager implements QuerySegmentWalker
private final FireDepartment fireDepartment;
private final FireDepartmentMetrics metrics;
private volatile FireDepartmentConfig config = null;
private volatile RealtimeDriverConfig config = null;
private volatile Firehose firehose = null;
private volatile Plumber plumber = null;
private volatile boolean normalExit = true;

View File

@ -34,6 +34,7 @@ import java.util.List;
/**
*/
@Deprecated
public class Schema
{
private final String dataSource;

View File

@ -8,6 +8,7 @@ import com.metamx.emitter.service.ServiceEmitter;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.SegmentGranularity;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.Schema;
import io.druid.server.coordination.DataSegmentAnnouncer;
@ -39,7 +40,7 @@ public class FlushingPlumber extends RealtimePlumber
Period windowPeriod,
File basePersistDirectory,
SegmentGranularity segmentGranularity,
Schema schema,
DataSchema schema,
FireDepartmentMetrics metrics,
RejectionPolicy rejectionPolicy,
ServiceEmitter emitter,

View File

@ -28,6 +28,7 @@ import com.metamx.emitter.service.ServiceEmitter;
import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.SegmentGranularity;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.Schema;
import io.druid.server.coordination.DataSegmentAnnouncer;
@ -96,8 +97,7 @@ public class FlushingPlumberSchool implements PlumberSchool
}
@Override
public Plumber findPlumber(final Schema schema, final FireDepartmentMetrics metrics)
public Plumber findPlumber(final DataSchema schema, final FireDepartmentMetrics metrics)
{
verifyState();
@ -121,6 +121,12 @@ public class FlushingPlumberSchool implements PlumberSchool
);
}
@Override
public SegmentGranularity getSegmentGranularity()
{
return segmentGranularity;
}
private void verifyState()
{
Preconditions.checkNotNull(conglomerate, "must specify a queryRunnerFactoryConglomerate to do this action.");

View File

@ -21,8 +21,13 @@ package io.druid.segment.realtime.plumber;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.segment.SegmentGranularity;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.Schema;
import org.joda.time.Period;
import java.io.File;
/**
*/
@ -38,5 +43,7 @@ public interface PlumberSchool
*
* @return returns a plumber
*/
public Plumber findPlumber(Schema schema, FireDepartmentMetrics metrics);
public Plumber findPlumber(DataSchema schema, FireDepartmentMetrics metrics);
public SegmentGranularity getSegmentGranularity();
}

View File

@ -35,6 +35,7 @@ import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
@ -72,7 +73,7 @@ public class RealtimePlumber implements Plumber
private final Period windowPeriod;
private final File basePersistDirectory;
private final SegmentGranularity segmentGranularity;
private final Schema schema;
private final DataSchema schema;
private final FireDepartmentMetrics metrics;
private final RejectionPolicy rejectionPolicy;
private final ServiceEmitter emitter;
@ -100,7 +101,7 @@ public class RealtimePlumber implements Plumber
Period windowPeriod,
File basePersistDirectory,
SegmentGranularity segmentGranularity,
Schema schema,
DataSchema schema,
FireDepartmentMetrics metrics,
RejectionPolicy rejectionPolicy,
ServiceEmitter emitter,
@ -131,7 +132,7 @@ public class RealtimePlumber implements Plumber
this.maxPendingPersists = maxPendingPersists;
}
public Schema getSchema()
public DataSchema getSchema()
{
return schema;
}
@ -632,12 +633,12 @@ public class RealtimePlumber implements Plumber
}
}
protected File computeBaseDir(Schema schema)
protected File computeBaseDir(DataSchema schema)
{
return new File(basePersistDirectory, schema.getDataSource());
}
protected File computePersistDir(Schema schema, Interval interval)
protected File computePersistDir(DataSchema schema, Interval interval)
{
return new File(computeBaseDir(schema), interval.toString().replace("/", "_"));
}
@ -651,7 +652,7 @@ public class RealtimePlumber implements Plumber
*
* @return the number of rows persisted
*/
protected int persistHydrant(FireHydrant indexToPersist, Schema schema, Interval interval)
protected int persistHydrant(FireHydrant indexToPersist, DataSchema schema, Interval interval)
{
if (indexToPersist.hasSwapped()) {
log.info(

View File

@ -29,6 +29,7 @@ import io.druid.client.ServerView;
import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.SegmentGranularity;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.Schema;
@ -158,7 +159,13 @@ public class RealtimePlumberSchool implements PlumberSchool
}
@Override
public Plumber findPlumber(final Schema schema, final FireDepartmentMetrics metrics)
public SegmentGranularity getSegmentGranularity()
{
return segmentGranularity;
}
@Override
public Plumber findPlumber(final DataSchema schema, final FireDepartmentMetrics metrics)
{
verifyState();

View File

@ -31,6 +31,7 @@ import io.druid.data.input.InputRow;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.realtime.FireHydrant;
import io.druid.segment.realtime.Schema;
import io.druid.timeline.DataSegment;
@ -51,13 +52,13 @@ public class Sink implements Iterable<FireHydrant>
private volatile FireHydrant currIndex;
private final Interval interval;
private final Schema schema;
private final DataSchema schema;
private final String version;
private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<FireHydrant>();
public Sink(
Interval interval,
Schema schema,
DataSchema schema,
String version
)
{
@ -70,7 +71,7 @@ public class Sink implements Iterable<FireHydrant>
public Sink(
Interval interval,
Schema schema,
DataSchema schema,
String version,
List<FireHydrant> hydrants
)
@ -164,13 +165,13 @@ public class Sink implements Iterable<FireHydrant>
);
}
private FireHydrant makeNewCurrIndex(long minTimestamp, Schema schema)
private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema)
{
IncrementalIndex newIndex = new IncrementalIndex(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(minTimestamp)
.withQueryGranularity(schema.getIndexGranularity())
.withSpatialDimensions(schema.getSpatialDimensions())
.withQueryGranularity(schema.getGranularitySpec().getQueryGranularity())
.withSpatialDimensions(schema.getParser().getParseSpec().getDimensionsSpec().getSpatialDimensions())
.withMetrics(schema.getAggregators())
.build()
);

View File

@ -22,6 +22,7 @@ package io.druid.segment.realtime;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
@ -31,6 +32,9 @@ import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.SegmentGranularity;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.GranularitySpec;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.segment.realtime.plumber.Sink;
@ -72,11 +76,21 @@ public class RealtimeManagerTest
makeRow(new DateTime("9000-01-01").getMillis()), makeRow(new DateTime().getMillis())
);
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, new DateTime().toString()));
DataSchema dataSchema = new DataSchema(
schema.getDataSource(),
null,
null,
null,
schema.getAggregators(),
new GranularitySpec(null, schema.getIndexGranularity()),
schema.getShardSpec()
);
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), dataSchema, new DateTime().toString()));
realtimeManager = new RealtimeManager(
Arrays.<FireDepartment>asList(
new FireDepartment(
null, null, null,
schema,
new FireDepartmentConfig(1, new Period("P1Y")),
new FirehoseFactory()
@ -86,16 +100,28 @@ public class RealtimeManagerTest
{
return new TestFirehose(rows.iterator());
}
@Override
public ByteBufferInputRowParser getParser()
{
throw new UnsupportedOperationException();
}
},
new PlumberSchool()
{
@Override
public Plumber findPlumber(
Schema schema, FireDepartmentMetrics metrics
DataSchema schema, FireDepartmentMetrics metrics
)
{
return plumber;
}
@Override
public SegmentGranularity getSegmentGranularity()
{
throw new UnsupportedOperationException();
}
}
)
),