changing DatasourcePathSpec, to get segment list, so that hadoop indexer uses overlord action to get list of segments and passes when running as an overlord task. and, uses metadata store directly when running as standalone hadoop indexer

also, serialized list of segments is passed to DatasourcePathSpec so that hadoop classloader issues do not creep up
This commit is contained in:
Himanshu Gupta 2015-08-12 17:32:35 -05:00
parent 45947a1021
commit 15fa43dd43
12 changed files with 504 additions and 65 deletions

View File

@ -18,6 +18,7 @@
package io.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -247,6 +248,12 @@ public class HadoopDruidIndexerConfig
return schema;
}
@JsonIgnore
public PathSpec getPathSpec()
{
return pathSpec;
}
public String getDataSource()
{
return schema.getDataSchema().getDataSource();

View File

@ -19,8 +19,16 @@ package io.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.path.UsedSegmentLister;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IngestionSpec;
import io.druid.timeline.DataSegment;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
*/
@ -91,4 +99,45 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
config
);
}
public static HadoopIngestionSpec updateSegmentListIfDatasourcePathSpecIsUsed(
HadoopIngestionSpec spec,
ObjectMapper jsonMapper,
UsedSegmentLister segmentLister
)
throws IOException
{
String dataSource = "dataSource";
String type = "type";
String multi = "multi";
String children = "children";
String segments = "segments";
String ingestionSpec = "ingestionSpec";
Map<String, Object> pathSpec = spec.getIOConfig().getPathSpec();
Map<String, Object> datasourcePathSpec = null;
if(pathSpec.get(type).equals(dataSource)) {
datasourcePathSpec = pathSpec;
} else if(pathSpec.get(type).equals(multi)) {
List<Map<String, Object>> childPathSpecs = (List<Map<String, Object>>) pathSpec.get(children);
for(Map<String, Object> childPathSpec : childPathSpecs) {
if (childPathSpec.get(type).equals(dataSource)) {
datasourcePathSpec = childPathSpec;
break;
}
}
}
if (datasourcePathSpec != null) {
Map<String, Object> ingestionSpecMap = (Map<String, Object>) datasourcePathSpec.get(ingestionSpec);
DatasourceIngestionSpec ingestionSpecObj = jsonMapper.convertValue(ingestionSpecMap, DatasourceIngestionSpec.class);
List<DataSegment> segmentsList = segmentLister.getUsedSegmentsForInterval(
ingestionSpecObj.getDataSource(),
ingestionSpecObj.getInterval()
);
datasourcePathSpec.put(segments, segmentsList);
}
return spec;
}
}

View File

@ -22,8 +22,8 @@ package io.druid.indexer.path;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@ -31,7 +31,6 @@ import com.metamx.common.logger.Logger;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.hadoop.DatasourceInputFormat;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.fs.Path;
@ -46,21 +45,20 @@ public class DatasourcePathSpec implements PathSpec
{
private static final Logger logger = new Logger(DatasourcePathSpec.class);
private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private ObjectMapper mapper;
private DatasourceIngestionSpec ingestionSpec;
private long maxSplitSize;
private final ObjectMapper mapper;
private final DatasourceIngestionSpec ingestionSpec;
private final long maxSplitSize;
private final List<DataSegment> segments;
public DatasourcePathSpec(
@JacksonInject IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
@JacksonInject ObjectMapper mapper,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("ingestionSpec") DatasourceIngestionSpec spec,
@JsonProperty("maxSplitSize") Long maxSplitSize
)
{
this.indexerMetadataStorageCoordinator = Preconditions.checkNotNull(indexerMetadataStorageCoordinator, "null indexerMetadataStorageCoordinator");
this.mapper = Preconditions.checkNotNull(mapper, "null mapper");
this.segments = segments;
this.ingestionSpec = Preconditions.checkNotNull(spec, "null ingestionSpec");
if(maxSplitSize == null) {
@ -70,6 +68,12 @@ public class DatasourcePathSpec implements PathSpec
}
}
@JsonProperty
public List<DataSegment> getSegments()
{
return segments;
}
@JsonProperty
public DatasourceIngestionSpec getIngestionSpec()
{
@ -87,10 +91,8 @@ public class DatasourcePathSpec implements PathSpec
HadoopDruidIndexerConfig config, Job job
) throws IOException
{
final List<DataSegment> segments = indexerMetadataStorageCoordinator.getUsedSegmentsForInterval(
ingestionSpec.getDataSource(),
ingestionSpec.getInterval()
);
Preconditions.checkArgument(segments != null && !segments.isEmpty(), "no segments provided");
logger.info(
"Found total [%d] segments for [%s] in interval [%s]",
segments.size(),
@ -98,7 +100,8 @@ public class DatasourcePathSpec implements PathSpec
ingestionSpec.getInterval()
);
if (ingestionSpec.getDimensions() == null) {
DatasourceIngestionSpec updatedIngestionSpec = ingestionSpec;
if (updatedIngestionSpec.getDimensions() == null) {
List<String> dims;
if (config.getParser().getParseSpec().getDimensionsSpec().hasCustomDimensions()) {
dims = config.getParser().getParseSpec().getDimensionsSpec().getDimensions();
@ -128,10 +131,10 @@ public class DatasourcePathSpec implements PathSpec
)
);
}
ingestionSpec = ingestionSpec.withDimensions(dims);
updatedIngestionSpec = updatedIngestionSpec.withDimensions(dims);
}
if (ingestionSpec.getMetrics() == null) {
if (updatedIngestionSpec.getMetrics() == null) {
Set<String> metrics = Sets.newHashSet();
final AggregatorFactory[] cols = config.getSchema().getDataSchema().getAggregators();
if (cols != null) {
@ -139,10 +142,10 @@ public class DatasourcePathSpec implements PathSpec
metrics.add(col.getName());
}
}
ingestionSpec = ingestionSpec.withMetrics(Lists.newArrayList(metrics));
updatedIngestionSpec = updatedIngestionSpec.withMetrics(Lists.newArrayList(metrics));
}
job.getConfiguration().set(DatasourceInputFormat.CONF_DRUID_SCHEMA, mapper.writeValueAsString(ingestionSpec));
job.getConfiguration().set(DatasourceInputFormat.CONF_DRUID_SCHEMA, mapper.writeValueAsString(updatedIngestionSpec));
job.getConfiguration().set(DatasourceInputFormat.CONF_INPUT_SEGMENTS, mapper.writeValueAsString(segments));
job.getConfiguration().set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, String.valueOf(maxSplitSize));
MultipleInputs.addInputPath(job, new Path("/dummy/tobe/ignored"), DatasourceInputFormat.class);
@ -165,7 +168,10 @@ public class DatasourcePathSpec implements PathSpec
if (maxSplitSize != that.maxSplitSize) {
return false;
}
return ingestionSpec.equals(that.ingestionSpec);
if (!ingestionSpec.equals(that.ingestionSpec)) {
return false;
}
return !(segments != null ? !segments.equals(that.segments) : that.segments != null);
}
@ -174,6 +180,7 @@ public class DatasourcePathSpec implements PathSpec
{
int result = ingestionSpec.hashCode();
result = 31 * result + (int) (maxSplitSize ^ (maxSplitSize >>> 32));
result = 31 * result + (segments != null ? segments.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.path;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.List;
/**
*/
public class MetadataStoreBasedUsedSegmentLister implements UsedSegmentLister
{
private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
@Inject
public MetadataStoreBasedUsedSegmentLister(IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator)
{
this.indexerMetadataStorageCoordinator = Preconditions.checkNotNull(
indexerMetadataStorageCoordinator,
"null indexerMetadataStorageCoordinator"
);
}
@Override
public List<DataSegment> getUsedSegmentsForInterval(
String dataSource, Interval interval
) throws IOException
{
return indexerMetadataStorageCoordinator.getUsedSegmentsForInterval(dataSource, interval);
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.path;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.List;
/**
*/
public interface UsedSegmentLister
{
/**
* Get all segments which may include any data in the interval and are flagged as used.
*
* @param dataSource The datasource to query
* @param interval The interval for which all applicable and used datasources are requested. Start is inclusive, end is exclusive
*
* @return The DataSegments which include data in the requested interval. These segments may contain data outside the requested interval.
*
* @throws IOException
*/
public List<DataSegment> getUsedSegmentsForInterval(final String dataSource, final Interval interval)
throws IOException;
}

View File

@ -20,6 +20,7 @@ package io.druid.indexer.updater;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.PasswordProvider;
import javax.validation.constraints.NotNull;
@ -78,4 +79,20 @@ public class MetadataStorageUpdaterJobSpec implements Supplier<MetadataStorageCo
}
};
}
//Note: Currently it only supports configured segmentTable, other tables should be added if needed
//by the code using this
public MetadataStorageTablesConfig getMetadataStorageTablesConfig()
{
return new MetadataStorageTablesConfig(
null,
segmentTable,
null,
null,
null,
null,
null,
null
);
}
}

View File

@ -0,0 +1,154 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.Granularity;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.path.DatasourcePathSpec;
import io.druid.indexer.path.MultiplePathSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.indexer.path.StaticPathSpec;
import io.druid.indexer.path.UsedSegmentLister;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Map;
/**
*/
public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
{
private final String testDatasource = "test";
private final Interval testDatasourceInterval = new Interval("1970/2000");
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
private final List<DataSegment> segments = ImmutableList.of(
new DataSegment(
"test1",
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp/index1.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
2
)
);
@Test
public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithNoDatasourcePathSpec() throws Exception
{
PathSpec pathSpec = new StaticPathSpec("/xyz", null);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(pathSpec);
Assert.assertTrue(config.getPathSpec() instanceof StaticPathSpec);
}
@Test
public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithJustDatasourcePathSpec() throws Exception
{
PathSpec pathSpec = new DatasourcePathSpec(
jsonMapper,
null,
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null),
null
);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(pathSpec);
Assert.assertEquals(segments, ((DatasourcePathSpec) config.getPathSpec()).getSegments());
}
@Test
public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithMultiplePathSpec() throws Exception
{
PathSpec pathSpec = new MultiplePathSpec(
ImmutableList.of(
new StaticPathSpec("/xyz", null),
new DatasourcePathSpec(
jsonMapper,
null,
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null),
null
)
)
);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(pathSpec);
Assert.assertEquals(
segments,
((DatasourcePathSpec) ((MultiplePathSpec) config.getPathSpec()).getChildren().get(1)).getSegments()
);
}
private HadoopDruidIndexerConfig testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
PathSpec datasourcePathSpec
)
throws Exception
{
HadoopIngestionSpec spec = new HadoopIngestionSpec(
new DataSchema(
"foo",
null,
new AggregatorFactory[0],
new UniformGranularitySpec(
Granularity.DAY,
null,
ImmutableList.of(
new Interval("2010-01-01/P1D")
)
)
),
new HadoopIOConfig(
jsonMapper.convertValue(datasourcePathSpec, Map.class),
null,
null
),
null
);
spec = jsonMapper.readValue(
jsonMapper.writeValueAsString(spec),
HadoopIngestionSpec.class
);
UsedSegmentLister segmentLister = EasyMock.createMock(UsedSegmentLister.class);
EasyMock.expect(
segmentLister.getUsedSegmentsForInterval(testDatasource, testDatasourceInterval)
).andReturn(segments);
EasyMock.replay(segmentLister);
spec = HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(spec, jsonMapper, segmentLister);
return HadoopDruidIndexerConfig.fromString(jsonMapper.writeValueAsString(spec));
}
}

View File

@ -42,12 +42,10 @@ import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexer.HadoopTuningConfig;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.hadoop.DatasourceInputFormat;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.initialization.Initialization;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.server.DruidNode;
@ -67,6 +65,7 @@ import java.util.List;
public class DatasourcePathSpecTest
{
private DatasourceIngestionSpec ingestionSpec;
private List<DataSegment> segments;
public DatasourcePathSpecTest()
{
@ -78,13 +77,44 @@ public class DatasourcePathSpecTest
null,
null
);
segments = ImmutableList.of(
new DataSegment(
ingestionSpec.getDataSource(),
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp/index.zip"
),
ImmutableList.of("product"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
12334
),
new DataSegment(
ingestionSpec.getDataSource(),
Interval.parse("2050/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", "/tmp/index.zip"
),
ImmutableList.of("product"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
12335
)
);
}
@Test
public void testSerde() throws Exception
{
final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator = EasyMock.createMock(
IndexerMetadataStorageCoordinator.class
final UsedSegmentLister segmentList = EasyMock.createMock(
UsedSegmentLister.class
);
Injector injector = Initialization.makeInjectorWithModules(
@ -95,7 +125,7 @@ public class DatasourcePathSpecTest
@Override
public void configure(Binder binder)
{
binder.bind(IndexerMetadataStorageCoordinator.class).toInstance(indexerMetadataStorageCoordinator);
binder.bind(UsedSegmentLister.class).toInstance(segmentList);
JsonConfigProvider.bindInstance(
binder, Key.get(DruidNode.class, Self.class), new DruidNode("dummy-node", null, null)
);
@ -107,8 +137,8 @@ public class DatasourcePathSpecTest
ObjectMapper jsonMapper = injector.getInstance(ObjectMapper.class);
DatasourcePathSpec expected = new DatasourcePathSpec(
indexerMetadataStorageCoordinator,
jsonMapper,
null,
ingestionSpec,
Long.valueOf(10)
);
@ -116,8 +146,17 @@ public class DatasourcePathSpecTest
Assert.assertEquals(expected, actual);
expected = new DatasourcePathSpec(
indexerMetadataStorageCoordinator,
jsonMapper,
null,
ingestionSpec,
null
);
actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class);
Assert.assertEquals(expected, actual);
expected = new DatasourcePathSpec(
jsonMapper,
segments,
ingestionSpec,
null
);
@ -161,46 +200,13 @@ public class DatasourcePathSpecTest
)
);
List<DataSegment> segments = ImmutableList.of(
new DataSegment(
ingestionSpec.getDataSource(),
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp/index.zip"
),
ImmutableList.of("product"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
12334
),
new DataSegment(
ingestionSpec.getDataSource(),
Interval.parse("2050/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", "/tmp/index.zip"
),
ImmutableList.of("product"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
12335
)
);
IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator = EasyMock.createMock(IndexerMetadataStorageCoordinator.class);
EasyMock.expect(indexerMetadataStorageCoordinator.getUsedSegmentsForInterval(ingestionSpec.getDataSource(), ingestionSpec.getInterval())).andReturn(segments);
EasyMock.replay(indexerMetadataStorageCoordinator);
ObjectMapper mapper = new DefaultObjectMapper();
DatasourcePathSpec pathSpec = new DatasourcePathSpec(
indexerMetadataStorageCoordinator,
mapper,
segments,
ingestionSpec,
null
);

View File

@ -17,10 +17,12 @@
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@ -33,17 +35,22 @@ import io.druid.indexer.HadoopDruidIndexerJob;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexer.Jobby;
import io.druid.indexer.MetadataStorageUpdaterJobHandler;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.hadoop.OverlordActionBasedUsedSegmentLister;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
public class HadoopIndexTask extends HadoopTask
@ -56,10 +63,14 @@ public class HadoopIndexTask extends HadoopTask
}
@JsonIgnore
private final HadoopIngestionSpec spec;
private HadoopIngestionSpec spec;
@JsonIgnore
private final String classpathPrefix;
@JsonIgnore
private final ObjectMapper jsonMapper;
/**
* @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters
* for creating Druid index segments. It may be modified.
@ -76,7 +87,8 @@ public class HadoopIndexTask extends HadoopTask
@JsonProperty("spec") HadoopIngestionSpec spec,
@JsonProperty("hadoopCoordinates") String hadoopCoordinates,
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates,
@JsonProperty("classpathPrefix") String classpathPrefix
@JsonProperty("classpathPrefix") String classpathPrefix,
@JacksonInject ObjectMapper jsonMapper
)
{
super(
@ -102,6 +114,7 @@ public class HadoopIndexTask extends HadoopTask
);
this.classpathPrefix = classpathPrefix;
this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "null ObjectMappper");
}
@Override
@ -152,6 +165,11 @@ public class HadoopIndexTask extends HadoopTask
final ClassLoader loader = buildClassLoader(toolbox);
boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent();
spec = HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(
spec,
jsonMapper,
new OverlordActionBasedUsedSegmentLister(toolbox));
final String config = invokeForeignLoader(
"io.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessing",
new String[]{

View File

@ -0,0 +1,54 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.hadoop;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.inject.Inject;
import io.druid.indexer.path.UsedSegmentLister;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.List;
/**
*/
public class OverlordActionBasedUsedSegmentLister implements UsedSegmentLister
{
private final TaskToolbox toolbox;
@Inject
public OverlordActionBasedUsedSegmentLister(TaskToolbox toolbox)
{
this.toolbox = Preconditions.checkNotNull(toolbox, "null task toolbox");
}
@Override
public List<DataSegment> getUsedSegmentsForInterval(
String dataSource, Interval interval
) throws IOException
{
return toolbox
.getTaskActionClient()
.submit(new SegmentListUsedAction(dataSource, interval));
}
}

View File

@ -471,11 +471,15 @@ public class TaskSerdeTest
),
null,
null,
"blah"
"blah",
jsonMapper
);
final String json = jsonMapper.writeValueAsString(task);
final HadoopIndexTask task2 = (HadoopIndexTask) jsonMapper.readValue(json, Task.class);
InjectableValues inject = new InjectableValues.Std()
.addValue(ObjectMapper.class, jsonMapper);
final HadoopIndexTask task2 = (HadoopIndexTask) jsonMapper.reader(Task.class).with(inject).readValue(json);
Assert.assertEquals("foo", task.getDataSource());

View File

@ -30,14 +30,24 @@ import com.google.inject.name.Names;
import com.metamx.common.logger.Logger;
import io.airlift.command.Arguments;
import io.airlift.command.Command;
import io.druid.guice.LazySingleton;
import io.druid.indexer.HadoopDruidDetermineConfigurationJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerJob;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexer.JobHelper;
import io.druid.indexer.Jobby;
import io.druid.indexer.MetadataStorageUpdaterJobHandler;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.path.DatasourcePathSpec;
import io.druid.indexer.path.MetadataStoreBasedUsedSegmentLister;
import io.druid.indexer.path.MultiplePathSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.indexer.updater.MetadataStorageUpdaterJobSpec;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
import java.io.File;
import java.net.URI;
@ -84,6 +94,10 @@ public class CliInternalHadoopIndexer extends GuiceRunnable
binder.bind(new TypeLiteral<Supplier<MetadataStorageConnectorConfig>>() {})
.toInstance(metadataSpec);
binder.bind(MetadataStorageTablesConfig.class).toInstance(metadataSpec.getMetadataStorageTablesConfig());
binder.bind(IndexerMetadataStorageCoordinator.class).to(IndexerSQLMetadataStorageCoordinator.class).in(
LazySingleton.class
);
}
}
);
@ -95,11 +109,23 @@ public class CliInternalHadoopIndexer extends GuiceRunnable
try {
Injector injector = makeInjector();
MetadataStorageUpdaterJobSpec metadataSpec = getHadoopDruidIndexerConfig().getSchema().getIOConfig().getMetadataUpdateSpec();
config = getHadoopDruidIndexerConfig();
MetadataStorageUpdaterJobSpec metadataSpec = config.getSchema().getIOConfig().getMetadataUpdateSpec();
// override metadata storage type based on HadoopIOConfig
Preconditions.checkNotNull(metadataSpec.getType(), "type in metadataUpdateSpec must not be null");
injector.getInstance(Properties.class).setProperty("druid.metadata.storage.type", metadataSpec.getType());
config = HadoopDruidIndexerConfig.fromSpec(
HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(
config.getSchema(),
HadoopDruidIndexerConfig.jsonMapper,
new MetadataStoreBasedUsedSegmentLister(
injector.getInstance(IndexerMetadataStorageCoordinator.class)
)
)
);
List<Jobby> jobs = Lists.newArrayList();
jobs.add(new HadoopDruidDetermineConfigurationJob(config));
jobs.add(new HadoopDruidIndexerJob(config, injector.getInstance(MetadataStorageUpdaterJobHandler.class)));