mirror of
https://github.com/apache/druid.git
synced 2025-02-09 03:24:55 +00:00
Addresed Gian's code review
This commit is contained in:
parent
017d4779d6
commit
cb845e6f09
@ -45,7 +45,6 @@ import com.metamx.druid.index.v1.IndexIO;
|
|||||||
import com.metamx.druid.index.v1.IndexMerger;
|
import com.metamx.druid.index.v1.IndexMerger;
|
||||||
import com.metamx.druid.indexer.rollup.DataRollupSpec;
|
import com.metamx.druid.indexer.rollup.DataRollupSpec;
|
||||||
import com.metamx.druid.input.MapBasedInputRow;
|
import com.metamx.druid.input.MapBasedInputRow;
|
||||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
@ -70,8 +69,6 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
|
|||||||
import org.codehaus.jackson.map.ObjectMapper;
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
import org.skife.jdbi.v2.Handle;
|
|
||||||
import org.skife.jdbi.v2.tweak.HandleCallback;
|
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.io.BufferedOutputStream;
|
import java.io.BufferedOutputStream;
|
||||||
@ -185,7 +182,7 @@ public class IndexGeneratorJob implements Jobby
|
|||||||
for (FileStatus status : fs.listStatus(descriptorInfoDir)) {
|
for (FileStatus status : fs.listStatus(descriptorInfoDir)) {
|
||||||
final DataSegment segment = jsonMapper.readValue(fs.open(status.getPath()), DataSegment.class);
|
final DataSegment segment = jsonMapper.readValue(fs.open(status.getPath()), DataSegment.class);
|
||||||
publishedSegmentsBuilder.add(segment);
|
publishedSegmentsBuilder.add(segment);
|
||||||
log.info("Published %s", segment.getIdentifier());
|
log.info("Adding segment %s to the list of published segments", segment.getIdentifier());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (IOException e) {
|
||||||
|
@ -26,9 +26,10 @@ public class HadoopIndexTask extends AbstractTask
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* @param config is used by the HadoopDruidIndexerJob to set up the appropriate parameters
|
* @param config is used by the HadoopDruidIndexerJob to set up the appropriate parameters
|
||||||
* for creating Druid index segments. It may be modified. Here, we will ensure that the
|
* for creating Druid index segments. It may be modified.
|
||||||
* UpDaterJobSpec field of the config is set to null, such that the job does not push a
|
* <p/>
|
||||||
* list of published segments the database. Instead, we will use the method
|
* Here, we will ensure that the UpDaterJobSpec field of the config is set to null, such that the
|
||||||
|
* job does not push a list of published segments the database. Instead, we will use the method
|
||||||
* IndexGeneratorJob.getPublishedSegments() to simply return a list of the published
|
* IndexGeneratorJob.getPublishedSegments() to simply return a list of the published
|
||||||
* segments, and let the indexing service report these segments to the database.
|
* segments, and let the indexing service report these segments to the database.
|
||||||
*/
|
*/
|
||||||
@ -37,13 +38,15 @@ public class HadoopIndexTask extends AbstractTask
|
|||||||
public HadoopIndexTask(@JsonProperty("config") HadoopDruidIndexerConfig config)
|
public HadoopIndexTask(@JsonProperty("config") HadoopDruidIndexerConfig config)
|
||||||
{
|
{
|
||||||
super(
|
super(
|
||||||
String.format("index_hadoop_%s_interval_%s", config.getDataSource(), new DateTime().now()),
|
String.format("index_hadoop_%s_interval_%s", config.getDataSource(), new DateTime()),
|
||||||
config.getDataSource(),
|
config.getDataSource(),
|
||||||
JodaUtils.umbrellaInterval(config.getIntervals())
|
JodaUtils.umbrellaInterval(config.getIntervals())
|
||||||
);
|
);
|
||||||
|
|
||||||
if (config.isUpdaterJobSpecSet()) {
|
if (config.isUpdaterJobSpecSet()) {
|
||||||
throw new IllegalArgumentException("UpDaterJobSpec is defined");
|
throw new IllegalArgumentException(
|
||||||
|
"The UpDaterJobSpec field of the Hadoop Druid indexer config must be set to null "
|
||||||
|
);
|
||||||
}
|
}
|
||||||
this.config = config;
|
this.config = config;
|
||||||
}
|
}
|
||||||
@ -51,7 +54,7 @@ public class HadoopIndexTask extends AbstractTask
|
|||||||
@Override
|
@Override
|
||||||
public Type getType()
|
public Type getType()
|
||||||
{
|
{
|
||||||
return Type.INDEX;
|
return Type.HADOOPINDEX;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -60,7 +63,7 @@ public class HadoopIndexTask extends AbstractTask
|
|||||||
log.info("Setting version to: %s", context.getVersion());
|
log.info("Setting version to: %s", context.getVersion());
|
||||||
config.setVersion(context.getVersion());
|
config.setVersion(context.getVersion());
|
||||||
|
|
||||||
if(toolbox.getSegmentPusher() instanceof S3SegmentPusher) {
|
if (toolbox.getSegmentPusher() instanceof S3SegmentPusher) {
|
||||||
// Hack alert! Bypassing SegmentPusher...
|
// Hack alert! Bypassing SegmentPusher...
|
||||||
S3SegmentPusher segmentPusher = (S3SegmentPusher) toolbox.getSegmentPusher();
|
S3SegmentPusher segmentPusher = (S3SegmentPusher) toolbox.getSegmentPusher();
|
||||||
String s3Path = String.format(
|
String s3Path = String.format(
|
||||||
|
@ -47,6 +47,7 @@ public interface Task
|
|||||||
enum Type
|
enum Type
|
||||||
{
|
{
|
||||||
INDEX,
|
INDEX,
|
||||||
|
HADOOPINDEX,
|
||||||
MERGE,
|
MERGE,
|
||||||
APPEND,
|
APPEND,
|
||||||
DELETE,
|
DELETE,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user