mirror of https://github.com/apache/druid.git
Merge branch 'master' into offheap-incremental-index
This commit is contained in:
commit
6265613bb9
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.121-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.121-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -19,13 +19,13 @@ Clone Druid and build it:
|
|||
git clone https://github.com/metamx/druid.git druid
|
||||
cd druid
|
||||
git fetch --tags
|
||||
git checkout druid-0.6.116
|
||||
git checkout druid-0.6.119
|
||||
./build.sh
|
||||
```
|
||||
|
||||
### Downloading the DSK (Druid Standalone Kit)
|
||||
|
||||
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.116-bin.tar.gz) a stand-alone tarball and run it:
|
||||
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.119-bin.tar.gz) a stand-alone tarball and run it:
|
||||
|
||||
``` bash
|
||||
tar -xzf druid-services-0.X.X-bin.tar.gz
|
||||
|
|
|
@ -5,9 +5,9 @@ layout: doc_page
|
|||
## What should I set my JVM heap?
|
||||
The size of the JVM heap really depends on the type of Druid node you are running. Below are a few considerations.
|
||||
|
||||
[Broker nodes](Broker.html) can use the JVM heap as a query cache and thus, the size of the heap will affect on the number of results that can be cached. Broker nodes do not require off-heap memory and generally, heap sizes can be set to be close to the maximum memory on the machine (leaving some room for JVM overhead). The heap is used to merge results from different real-time and historical nodes, along with other computational processing.
|
||||
[Broker nodes](Broker.html) uses the JVM heap mainly to merge results from historicals and real-times. Brokers also use off-heap memory and processing threads for groupBy queries.
|
||||
|
||||
[Historical nodes](Historical.html) use off-heap memory to store intermediate results, and by default, all segments are memory mapped before they can be queried. The more off-heap memory is available, the more segments can be served without the possibility of data being paged onto disk. On historicals, the JVM heap is used for [GroupBy queries](GroupByQuery.html), some data structures used for intermediate computation, and general processing.
|
||||
[Historical nodes](Historical.html) use off-heap memory to store intermediate results, and by default, all segments are memory mapped before they can be queried. Typically, the more memory is available on a historical node, the more segments can be served without the possibility of data being paged on to disk. On historicals, the JVM heap is used for [GroupBy queries](GroupByQuery.html), some data structures used for intermediate computation, and general processing. One way to calculate how much space there is for segments is: memory_for_segments = total_memory - heap - direct_memory - jvm_overhead.
|
||||
|
||||
[Coordinator nodes](Coordinator nodes) do not require off-heap memory and the heap is used for loading information about all segments to determine what segments need to be loaded, dropped, moved, or replicated.
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/overlord
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.116"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.119"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
@ -137,7 +137,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/middlemanager
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.116","io.druid.extensions:druid-kafka-seven:0.6.116"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.119","io.druid.extensions:druid-kafka-seven:0.6.119"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
@ -285,7 +285,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/historical
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.116"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.119"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
|
|
@ -27,7 +27,7 @@ druid.host=localhost
|
|||
druid.service=realtime
|
||||
druid.port=8083
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.116"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.119"]
|
||||
|
||||
|
||||
druid.zk.service.host=localhost
|
||||
|
@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/realtime
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.116","io.druid.extensions:druid-kafka-seven:0.6.116"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.119","io.druid.extensions:druid-kafka-seven:0.6.119"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
|
|
@ -28,7 +28,7 @@ Configuration:
|
|||
|
||||
-Ddruid.zk.service.host=localhost
|
||||
|
||||
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.116"]
|
||||
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.119"]
|
||||
|
||||
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
|
||||
-Ddruid.db.connector.user=druid
|
||||
|
|
|
@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
|
|||
|
||||
### Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.116-bin.tar.gz). Download this file to a directory of your choosing.
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.119-bin.tar.gz). Download this file to a directory of your choosing.
|
||||
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
||||
|
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
|
|||
Not too lost so far right? That's great! If you cd into the directory:
|
||||
|
||||
```
|
||||
cd druid-services-0.6.116
|
||||
cd druid-services-0.6.119
|
||||
```
|
||||
|
||||
You should see a bunch of files:
|
||||
|
|
|
@ -96,7 +96,7 @@ The configurations for the overlord node are as follows:
|
|||
|
||||
-Ddruid.zk.service.host=localhost
|
||||
|
||||
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.116"]
|
||||
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.119"]
|
||||
|
||||
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
|
||||
-Ddruid.db.connector.user=druid
|
||||
|
|
|
@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
|
|||
|
||||
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
|
||||
|
||||
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.116-bin.tar.gz)
|
||||
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.119-bin.tar.gz)
|
||||
|
||||
and untar the contents within by issuing:
|
||||
|
||||
|
@ -149,7 +149,7 @@ druid.port=8081
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.116"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.119"]
|
||||
|
||||
# Dummy read only AWS account (used to download example data)
|
||||
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
|
||||
|
@ -240,7 +240,7 @@ druid.port=8083
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.116","io.druid.extensions:druid-kafka-seven:0.6.116"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.119","io.druid.extensions:druid-kafka-seven:0.6.119"]
|
||||
|
||||
# Change this config to db to hand off to the rest of the Druid cluster
|
||||
druid.publish.type=noop
|
||||
|
|
|
@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
|
|||
|
||||
h3. Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.116-bin.tar.gz)
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.119-bin.tar.gz)
|
||||
Download this file to a directory of your choosing.
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
||||
|
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
|
|||
Not too lost so far right? That's great! If you cd into the directory:
|
||||
|
||||
```
|
||||
cd druid-services-0.6.116
|
||||
cd druid-services-0.6.119
|
||||
```
|
||||
|
||||
You should see a bunch of files:
|
||||
|
|
|
@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
|
|||
|
||||
# Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.116-bin.tar.gz).
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.119-bin.tar.gz).
|
||||
Download this bad boy to a directory of your choosing.
|
||||
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
|
|
@ -4,7 +4,7 @@ druid.port=8081
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.116"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.119"]
|
||||
|
||||
# Dummy read only AWS account (used to download example data)
|
||||
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
|
||||
-Ddruid.zk.service.host=localhost
|
||||
|
||||
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.116"]
|
||||
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.119"]
|
||||
|
||||
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
|
||||
-Ddruid.db.connector.user=druid
|
||||
|
|
|
@ -4,7 +4,7 @@ druid.port=8083
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.116","io.druid.extensions:druid-kafka-seven:0.6.116","io.druid.extensions:druid-rabbitmq:0.6.116"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.119","io.druid.extensions:druid-kafka-seven:0.6.119","io.druid.extensions:druid-rabbitmq:0.6.119"]
|
||||
|
||||
# Change this config to db to hand off to the rest of the Druid cluster
|
||||
druid.publish.type=noop
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.121-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.121-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.121-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -33,8 +33,8 @@ import com.metamx.common.logger.Logger;
|
|||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.Rows;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
|
||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
|
||||
import io.druid.timeline.partition.NoneShardSpec;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
|
@ -48,8 +48,8 @@ import org.apache.hadoop.io.Text;
|
|||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Partitioner;
|
||||
import org.apache.hadoop.mapreduce.Reducer;
|
||||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -92,7 +92,11 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
);
|
||||
|
||||
JobHelper.injectSystemProperties(groupByJob);
|
||||
if (config.isCombineText()) {
|
||||
groupByJob.setInputFormatClass(CombineTextInputFormat.class);
|
||||
} else {
|
||||
groupByJob.setInputFormatClass(TextInputFormat.class);
|
||||
}
|
||||
groupByJob.setMapperClass(DetermineCardinalityMapper.class);
|
||||
groupByJob.setMapOutputKeyClass(LongWritable.class);
|
||||
groupByJob.setMapOutputValueClass(BytesWritable.class);
|
||||
|
|
|
@ -37,7 +37,6 @@ import com.google.inject.Module;
|
|||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.FunctionalIterable;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.common.parsers.TimestampParser;
|
||||
import io.druid.common.utils.JodaUtils;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.StringInputRowParser;
|
||||
|
@ -111,7 +110,17 @@ public class HadoopDruidIndexerConfig
|
|||
|
||||
public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
|
||||
{
|
||||
//backwards compatibility
|
||||
if (argSpec.containsKey("schema")) {
|
||||
return HadoopDruidIndexerConfig.jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class);
|
||||
} else {
|
||||
return new HadoopDruidIndexerConfig(
|
||||
HadoopDruidIndexerConfig.jsonMapper.convertValue(
|
||||
argSpec,
|
||||
HadoopIngestionSpec.class
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -246,6 +255,11 @@ public class HadoopDruidIndexerConfig
|
|||
return (schema.getIOConfig().getMetadataUpdateSpec() != null);
|
||||
}
|
||||
|
||||
public boolean isCombineText()
|
||||
{
|
||||
return schema.getTuningConfig().isCombineText();
|
||||
}
|
||||
|
||||
public StringInputRowParser getParser()
|
||||
{
|
||||
return (StringInputRowParser) schema.getDataSchema().getParser();
|
||||
|
|
|
@ -73,6 +73,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||
final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec,
|
||||
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
|
||||
final @JsonProperty("jobProperties") Map<String, String> jobProperties,
|
||||
final @JsonProperty("combineText") boolean combineText,
|
||||
// These fields are deprecated and will be removed in the future
|
||||
final @JsonProperty("timestampColumn") String timestampColumn,
|
||||
final @JsonProperty("timestampFormat") String timestampFormat,
|
||||
|
@ -138,7 +139,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||
this.dataSchema = new DataSchema(
|
||||
dataSource,
|
||||
new StringInputRowParser(
|
||||
dataSpec == null ? null : dataSpec.toParseSpec(timestampSpec, dimensionExclusions),
|
||||
dataSpec == null ? null : dataSpec.toParseSpec(theTimestampSpec, dimensionExclusions),
|
||||
null, null, null, null
|
||||
),
|
||||
rollupSpec == null
|
||||
|
@ -163,7 +164,8 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||
cleanupOnFailure,
|
||||
overwriteFiles,
|
||||
ignoreInvalidRows,
|
||||
jobProperties
|
||||
jobProperties,
|
||||
combineText
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -212,6 +214,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||
null,
|
||||
false,
|
||||
null,
|
||||
false,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
@ -244,6 +247,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||
null,
|
||||
false,
|
||||
null,
|
||||
false,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
@ -276,6 +280,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||
null,
|
||||
false,
|
||||
null,
|
||||
false,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -52,7 +52,8 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
true,
|
||||
false,
|
||||
false,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -66,6 +67,7 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
private final boolean overwriteFiles;
|
||||
private final boolean ignoreInvalidRows;
|
||||
private final Map<String, String> jobProperties;
|
||||
private final boolean combineText;
|
||||
|
||||
@JsonCreator
|
||||
public HadoopTuningConfig(
|
||||
|
@ -78,7 +80,8 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
|
||||
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
|
||||
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
|
||||
final @JsonProperty("jobProperties") Map<String, String> jobProperties
|
||||
final @JsonProperty("jobProperties") Map<String, String> jobProperties,
|
||||
final @JsonProperty("combineText") boolean combineText
|
||||
)
|
||||
{
|
||||
this.workingPath = workingPath == null ? null : workingPath;
|
||||
|
@ -93,6 +96,7 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
this.jobProperties = (jobProperties == null
|
||||
? ImmutableMap.<String, String>of()
|
||||
: ImmutableMap.copyOf(jobProperties));
|
||||
this.combineText = combineText;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -155,6 +159,12 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
return jobProperties;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isCombineText()
|
||||
{
|
||||
return combineText;
|
||||
}
|
||||
|
||||
public HadoopTuningConfig withWorkingPath(String path)
|
||||
{
|
||||
return new HadoopTuningConfig(
|
||||
|
@ -167,7 +177,8 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
cleanupOnFailure,
|
||||
overwriteFiles,
|
||||
ignoreInvalidRows,
|
||||
jobProperties
|
||||
jobProperties,
|
||||
combineText
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -183,7 +194,8 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
cleanupOnFailure,
|
||||
overwriteFiles,
|
||||
ignoreInvalidRows,
|
||||
jobProperties
|
||||
jobProperties,
|
||||
combineText
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -199,7 +211,8 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
cleanupOnFailure,
|
||||
overwriteFiles,
|
||||
ignoreInvalidRows,
|
||||
jobProperties
|
||||
jobProperties,
|
||||
combineText
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,6 +62,7 @@ import org.apache.hadoop.mapreduce.JobContext;
|
|||
import org.apache.hadoop.mapreduce.Partitioner;
|
||||
import org.apache.hadoop.mapreduce.Reducer;
|
||||
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -146,7 +147,11 @@ public class IndexGeneratorJob implements Jobby
|
|||
|
||||
JobHelper.injectSystemProperties(job);
|
||||
|
||||
if (config.isCombineText()) {
|
||||
job.setInputFormatClass(CombineTextInputFormat.class);
|
||||
} else {
|
||||
job.setInputFormatClass(TextInputFormat.class);
|
||||
}
|
||||
|
||||
job.setMapperClass(IndexGeneratorMapper.class);
|
||||
job.setMapOutputValueClass(Text.class);
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.121-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -422,6 +422,7 @@ public class TaskSerdeTest
|
|||
null,
|
||||
false,
|
||||
ImmutableMap.of("foo", "bar"),
|
||||
false,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.121-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.121-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -23,7 +23,7 @@
|
|||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.121-SNAPSHOT</version>
|
||||
<name>druid</name>
|
||||
<description>druid</description>
|
||||
<scm>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.121-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.121-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.121-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.121-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.MapMaker;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.inject.Inject;
|
||||
|
@ -79,7 +80,8 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
|
|||
)
|
||||
{
|
||||
Predicate<DataSegment> predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values()));
|
||||
Set<DataSegment> filteredInventory = Sets.filter(inventory, predicate);
|
||||
// make a copy of the set and not just a filtered view, in order to not keep all the segment data in memory
|
||||
Set<DataSegment> filteredInventory = Sets.newHashSet(Iterables.filter(inventory, predicate));
|
||||
|
||||
zNodes.put(inventoryKey, filteredInventory);
|
||||
for (DataSegment segment : filteredInventory) {
|
||||
|
@ -94,7 +96,8 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
|
|||
)
|
||||
{
|
||||
Predicate<DataSegment> predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values()));
|
||||
Set<DataSegment> filteredInventory = Sets.filter(inventory, predicate);
|
||||
// make a copy of the set and not just a filtered view, in order to not keep all the segment data in memory
|
||||
Set<DataSegment> filteredInventory = Sets.newHashSet(Iterables.filter(inventory, predicate));
|
||||
|
||||
Set<DataSegment> existing = zNodes.get(inventoryKey);
|
||||
if (existing == null) {
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.121-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -38,7 +38,7 @@ import java.util.List;
|
|||
*/
|
||||
@Command(
|
||||
name = "hadoop-indexer",
|
||||
description = "Runs the batch Hadoop Druid Indexer, see https://github.com/metamx/druid/wiki/Batch-ingestion for a description."
|
||||
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/latest/Batch-ingestion.html for a description."
|
||||
)
|
||||
public class CliInternalHadoopIndexer implements Runnable
|
||||
{
|
||||
|
@ -67,11 +67,10 @@ public class CliInternalHadoopIndexer implements Runnable
|
|||
try {
|
||||
HadoopIngestionSpec spec;
|
||||
if (argumentSpec.startsWith("{")) {
|
||||
spec = HadoopDruidIndexerConfig.jsonMapper.readValue(argumentSpec, HadoopIngestionSpec.class);
|
||||
return HadoopDruidIndexerConfig.fromString(argumentSpec);
|
||||
} else {
|
||||
spec = HadoopDruidIndexerConfig.jsonMapper.readValue(new File(argumentSpec), HadoopIngestionSpec.class);
|
||||
return HadoopDruidIndexerConfig.fromFile(new File(argumentSpec));
|
||||
}
|
||||
return HadoopDruidIndexerConfig.fromSchema(spec);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
|
Loading…
Reference in New Issue