mirror of https://github.com/apache/druid.git
Merge pull request #1861 from guobingkun/insert_segment_tool
insert-segment tool
This commit is contained in:
commit
b1261035a7
|
@ -57,3 +57,8 @@ UIs
|
|||
* [mistercrunch/panoramix](https://github.com/mistercrunch/panoramix) - A web application to slice, dice and visualize data out of Druid
|
||||
* [grafana](https://github.com/Quantiply/grafana-plugins/tree/master/features/druid) - A plugin for [Grafana](http://grafana.org/)
|
||||
* [Pivot](https://github.com/implydata/pivot) - An exploratory analytics UI for Druid
|
||||
|
||||
Tools
|
||||
---
|
||||
|
||||
* [Insert Segments](../../operations/insert-segment-to-db.html) - A tool that can insert segments' metadata into Druid metadata storage.
|
|
@ -0,0 +1,95 @@
|
|||
---
|
||||
layout: doc_page
|
||||
---
|
||||
# insert-segment-to-db Tool
|
||||
|
||||
`insert-segment-to-db` is a tool that can insert segments into Druid metadata storage. It is intended to be used
|
||||
to update the segment table in metadata storage after people manually migrate segments from one place to another.
|
||||
It can also be used to insert missing segment into Druid, or even recover metadata storage by telling it where the
|
||||
segments are stored.
|
||||
|
||||
Note: This tool expects users to have Druid cluster running in a "safe" mode, where there are no active tasks to interfere
|
||||
the segments being inserted. Users can optionally bring down the cluster to make 100% sure nothing is interfering.
|
||||
|
||||
In order to make it work, user will have to provide metadata storage credentials and deep storage type through Java JVM argument
|
||||
or runtime.properties file. Specifically, this tool needs to know
|
||||
|
||||
`druid.metadata.storage.type`
|
||||
|
||||
`druid.metadata.storage.connector.connectURI`
|
||||
|
||||
`druid.metadata.storage.connector.user`
|
||||
|
||||
`druid.metadata.storage.connector.password`
|
||||
|
||||
`druid.storage.type`
|
||||
|
||||
Besides the properties above, you also need to specify the location where the segments are stored and whether you want to
|
||||
update descriptor.json. These two can be provided through command line arguments.
|
||||
|
||||
`--workingDir` (Required)
|
||||
|
||||
The directory URI where segments are stored. This tool will recursively look for segments underneath this directory
|
||||
and insert/update these segments in metdata storage.
|
||||
Attention: workingDir must be a complete URI, which means it must be prefixed with scheme type. For example,
|
||||
hdfs://hostname:port/segment_directory
|
||||
|
||||
`--updateDescriptor` (Optional)
|
||||
|
||||
if set to true, this tool will update `loadSpec` field in `descriptor.json` if the path in `loadSpec` is different from
|
||||
where `desciptor.json` was found. Default value is `true`.
|
||||
|
||||
Note: you will also need to load different Druid extensions per the metadata and deep storage you use. For example, if you
|
||||
use `mysql` as metadata storage and `HDFS` as deep storage, you should load `mysql-metadata-storage` and `druid-hdfs-storage`
|
||||
extensions.
|
||||
|
||||
|
||||
Example:
|
||||
|
||||
Suppose your metadata storage is `mysql` and you've migrated some segments to a directory in HDFS, and that directory looks
|
||||
like this,
|
||||
|
||||
```
|
||||
Directory path: /druid/storage/wikipedia
|
||||
|
||||
├── 2013-08-31T000000.000Z_2013-09-01T000000.000Z
|
||||
│ └── 2015-10-21T22_07_57.074Z
|
||||
│ └── 0
|
||||
│ ├── descriptor.json
|
||||
│ └── index.zip
|
||||
├── 2013-09-01T000000.000Z_2013-09-02T000000.000Z
|
||||
│ └── 2015-10-21T22_07_57.074Z
|
||||
│ └── 0
|
||||
│ ├── descriptor.json
|
||||
│ └── index.zip
|
||||
├── 2013-09-02T000000.000Z_2013-09-03T000000.000Z
|
||||
│ └── 2015-10-21T22_07_57.074Z
|
||||
│ └── 0
|
||||
│ ├── descriptor.json
|
||||
│ └── index.zip
|
||||
└── 2013-09-03T000000.000Z_2013-09-04T000000.000Z
|
||||
└── 2015-10-21T22_07_57.074Z
|
||||
└── 0
|
||||
├── descriptor.json
|
||||
└── index.zip
|
||||
```
|
||||
|
||||
To load all these segments into `mysql`, you can fire the command below,
|
||||
|
||||
```
|
||||
java
|
||||
-Ddruid.metadata.storage.type=mysql
|
||||
-Ddruid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
|
||||
-Ddruid.metadata.storage.connector.user=druid
|
||||
-Ddruid.metadata.storage.connector.password=diurd
|
||||
-Ddruid.extensions.loadList=[\"mysql-metadata-storage\",\"druid-hdfs-storage\"]
|
||||
-Ddruid.storage.type=hdfs
|
||||
-cp $DRUID_CLASSPATH
|
||||
io.druid.cli.Main tools insert-segment --workingDir hdfs://host:port//druid/storage/wikipedia --updateDescriptor true
|
||||
```
|
||||
|
||||
In this example, `mysql` and deep storage type are provided through Java JVM arguments, you can optionally put all
|
||||
of them in a runtime.properites file and include it in the Druid classpath. Note that we also include `mysql-metadata-storage`
|
||||
and `druid-hdfs-storage` in the extension list.
|
||||
|
||||
After running this command, the segments table in `mysql` should store the new location for each segment we just inserted.
|
|
@ -0,0 +1,116 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.storage.hdfs;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.segment.loading.DataSegmentFinder;
|
||||
import io.druid.segment.loading.SegmentLoadingException;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HdfsDataSegmentFinder implements DataSegmentFinder
|
||||
{
|
||||
|
||||
private static final Logger log = new Logger(HdfsDataSegmentFinder.class);
|
||||
|
||||
private final Configuration config;
|
||||
private final ObjectMapper mapper;
|
||||
|
||||
@Inject
|
||||
public HdfsDataSegmentFinder(Configuration config, ObjectMapper mapper)
|
||||
{
|
||||
this.config = config;
|
||||
this.mapper = mapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<DataSegment> findSegments(String workingDirPathStr, boolean updateDescriptor)
|
||||
throws SegmentLoadingException
|
||||
{
|
||||
final Set<DataSegment> segments = Sets.newHashSet();
|
||||
final Path workingDirPath = new Path(workingDirPathStr);
|
||||
FileSystem fs;
|
||||
try {
|
||||
fs = workingDirPath.getFileSystem(config);
|
||||
|
||||
log.info(fs.getScheme());
|
||||
log.info("FileSystem URI:" + fs.getUri().toString());
|
||||
|
||||
if (!fs.exists(workingDirPath)) {
|
||||
throw new SegmentLoadingException("Working directory [%s] doesn't exist.", workingDirPath);
|
||||
}
|
||||
|
||||
if (!fs.isDirectory(workingDirPath)) {
|
||||
throw new SegmentLoadingException("Working directory [%s] is not a directory!?", workingDirPath);
|
||||
}
|
||||
|
||||
final RemoteIterator<LocatedFileStatus> it = fs.listFiles(workingDirPath, true);
|
||||
while (it.hasNext()) {
|
||||
final LocatedFileStatus locatedFileStatus = it.next();
|
||||
final Path path = locatedFileStatus.getPath();
|
||||
if (path.getName().equals("descriptor.json")) {
|
||||
final Path indexZip = new Path(path.getParent(), "index.zip");
|
||||
if (fs.exists(indexZip)) {
|
||||
final DataSegment dataSegment = mapper.readValue(fs.open(path), DataSegment.class);
|
||||
log.info("Found segment [%s] located at [%s]", dataSegment.getIdentifier(), indexZip);
|
||||
|
||||
final Map<String, Object> loadSpec = dataSegment.getLoadSpec();
|
||||
final String pathWithoutScheme = indexZip.toUri().getPath();
|
||||
|
||||
if (!loadSpec.get("type").equals(HdfsStorageDruidModule.SCHEME) || !loadSpec.get("path")
|
||||
.equals(pathWithoutScheme)) {
|
||||
loadSpec.put("type", HdfsStorageDruidModule.SCHEME);
|
||||
loadSpec.put("path", pathWithoutScheme);
|
||||
if (updateDescriptor) {
|
||||
log.info("Updating loadSpec in descriptor.json at [%s] with new path [%s]", path, pathWithoutScheme);
|
||||
mapper.writeValue(fs.create(path, true), dataSegment);
|
||||
}
|
||||
}
|
||||
segments.add(dataSegment);
|
||||
} else {
|
||||
throw new SegmentLoadingException(
|
||||
"index.zip didn't exist at [%s] while descripter.json exists!?",
|
||||
indexZip
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new SegmentLoadingException(e, "Problems interacting with filesystem[%s].", workingDirPath);
|
||||
}
|
||||
|
||||
return segments;
|
||||
}
|
||||
|
||||
}
|
|
@ -38,17 +38,21 @@ public class HdfsLoadSpec implements LoadSpec
|
|||
{
|
||||
private final Path path;
|
||||
final HdfsDataSegmentPuller puller;
|
||||
|
||||
@JsonCreator
|
||||
public HdfsLoadSpec(
|
||||
@JacksonInject HdfsDataSegmentPuller puller,
|
||||
@JsonProperty(value = "path", required = true) String path
|
||||
){
|
||||
)
|
||||
{
|
||||
Preconditions.checkNotNull(path);
|
||||
this.path = new Path(path);
|
||||
this.puller = puller;
|
||||
}
|
||||
|
||||
@JsonProperty("path")
|
||||
public final String getPathString(){
|
||||
public final String getPathString()
|
||||
{
|
||||
return path.toString();
|
||||
}
|
||||
|
||||
|
|
|
@ -91,6 +91,7 @@ public class HdfsStorageDruidModule implements DruidModule
|
|||
Binders.dataSegmentPullerBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentPuller.class).in(LazySingleton.class);
|
||||
Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentPusher.class).in(LazySingleton.class);
|
||||
Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentKiller.class).in(LazySingleton.class);
|
||||
Binders.dataSegmentFinderBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentFinder.class).in(LazySingleton.class);
|
||||
|
||||
final Configuration conf = new Configuration();
|
||||
|
||||
|
@ -103,9 +104,11 @@ public class HdfsStorageDruidModule implements DruidModule
|
|||
try {
|
||||
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
|
||||
FileSystem.get(conf);
|
||||
} catch(IOException ex) {
|
||||
}
|
||||
catch (IOException ex) {
|
||||
throw Throwables.propagate(ex);
|
||||
} finally {
|
||||
}
|
||||
finally {
|
||||
Thread.currentThread().setContextClassLoader(currCtxCl);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,287 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.loading;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.storage.hdfs.HdfsDataSegmentFinder;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.NumberedShardSpec;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HdfsDataSegmentFinderTest
|
||||
{
|
||||
|
||||
private static final ObjectMapper mapper = new DefaultObjectMapper();
|
||||
private static final String DESCRIPTOR_JSON = "descriptor.json";
|
||||
private static final String INDEX_ZIP = "index.zip";
|
||||
private static final DataSegment SEGMENT_1 = DataSegment.builder()
|
||||
.dataSource("wikipedia")
|
||||
.interval(
|
||||
new Interval(
|
||||
"2013-08-31T00:00:00.000Z/2013-09-01T00:00:00.000Z"
|
||||
)
|
||||
)
|
||||
.version("2015-10-21T22:07:57.074Z")
|
||||
.loadSpec(
|
||||
ImmutableMap.<String, Object>of(
|
||||
"type",
|
||||
"hdfs",
|
||||
"path",
|
||||
"hdfs://abc.com:1234/somewhere/index.zip"
|
||||
)
|
||||
)
|
||||
.dimensions(ImmutableList.of("language", "page"))
|
||||
.metrics(ImmutableList.of("count"))
|
||||
.build();
|
||||
|
||||
private static final DataSegment SEGMENT_2 = DataSegment.builder(SEGMENT_1)
|
||||
.interval(
|
||||
new Interval(
|
||||
"2013-09-01T00:00:00.000Z/2013-09-02T00:00:00.000Z"
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
||||
private static final DataSegment SEGMENT_3 = DataSegment.builder(SEGMENT_1)
|
||||
.interval(
|
||||
new Interval(
|
||||
"2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"
|
||||
)
|
||||
)
|
||||
.version("2015-10-22T22:07:57.074Z")
|
||||
.build();
|
||||
|
||||
private static final DataSegment SEGMENT_4_0 = DataSegment.builder(SEGMENT_1)
|
||||
.interval(
|
||||
new Interval(
|
||||
"2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"
|
||||
)
|
||||
)
|
||||
.shardSpec(new NumberedShardSpec(0, 2))
|
||||
.build();
|
||||
|
||||
private static final DataSegment SEGMENT_4_1 = DataSegment.builder(SEGMENT_1)
|
||||
.interval(
|
||||
new Interval(
|
||||
"2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"
|
||||
)
|
||||
)
|
||||
.shardSpec(new NumberedShardSpec(1, 2))
|
||||
.build();
|
||||
|
||||
private static MiniDFSCluster miniCluster;
|
||||
private static File hdfsTmpDir;
|
||||
private static URI uriBase;
|
||||
private static Configuration conf;
|
||||
private static FileSystem fs;
|
||||
|
||||
private Path dataSourceDir;
|
||||
private Path descriptor1;
|
||||
private Path descriptor2;
|
||||
private Path descriptor3;
|
||||
private Path descriptor4_0;
|
||||
private Path descriptor4_1;
|
||||
private Path indexZip1;
|
||||
private Path indexZip2;
|
||||
private Path indexZip3;
|
||||
private Path indexZip4_0;
|
||||
private Path indexZip4_1;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupStatic() throws IOException
|
||||
{
|
||||
mapper.registerSubtypes(new NamedType(NumberedShardSpec.class, "numbered"));
|
||||
|
||||
hdfsTmpDir = File.createTempFile("hdfsDataSource", "dir");
|
||||
hdfsTmpDir.deleteOnExit();
|
||||
if (!hdfsTmpDir.delete()) {
|
||||
throw new IOException(String.format("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath()));
|
||||
}
|
||||
conf = new Configuration(true);
|
||||
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsTmpDir.getAbsolutePath());
|
||||
miniCluster = new MiniDFSCluster.Builder(conf).build();
|
||||
uriBase = miniCluster.getURI();
|
||||
fs = miniCluster.getFileSystem();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownStatic()
|
||||
{
|
||||
if (miniCluster != null) {
|
||||
miniCluster.shutdown(true);
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException
|
||||
{
|
||||
dataSourceDir = new Path(new Path(uriBase), "/usr/dataSource");
|
||||
descriptor1 = new Path(dataSourceDir, "interval1/v1/0/" + DESCRIPTOR_JSON);
|
||||
descriptor2 = new Path(dataSourceDir, "interval2/v1/0/" + DESCRIPTOR_JSON);
|
||||
descriptor3 = new Path(dataSourceDir, "interval3/v2/0/" + DESCRIPTOR_JSON);
|
||||
descriptor4_0 = new Path(dataSourceDir, "interval4/v1/0/" + DESCRIPTOR_JSON);
|
||||
descriptor4_1 = new Path(dataSourceDir, "interval4/v1/1/" + DESCRIPTOR_JSON);
|
||||
indexZip1 = new Path(descriptor1.getParent(), INDEX_ZIP);
|
||||
indexZip2 = new Path(descriptor2.getParent(), INDEX_ZIP);
|
||||
indexZip3 = new Path(descriptor3.getParent(), INDEX_ZIP);
|
||||
indexZip4_0 = new Path(descriptor4_0.getParent(), INDEX_ZIP);
|
||||
indexZip4_1 = new Path(descriptor4_1.getParent(), INDEX_ZIP);
|
||||
|
||||
mapper.writeValue(fs.create(descriptor1), SEGMENT_1);
|
||||
mapper.writeValue(fs.create(descriptor2), SEGMENT_2);
|
||||
mapper.writeValue(fs.create(descriptor3), SEGMENT_3);
|
||||
mapper.writeValue(fs.create(descriptor4_0), SEGMENT_4_0);
|
||||
mapper.writeValue(fs.create(descriptor4_1), SEGMENT_4_1);
|
||||
|
||||
create(indexZip1);
|
||||
create(indexZip2);
|
||||
create(indexZip3);
|
||||
create(indexZip4_0);
|
||||
create(indexZip4_1);
|
||||
}
|
||||
|
||||
private void create(Path indexZip1) throws IOException
|
||||
{
|
||||
try (FSDataOutputStream os = fs.create(indexZip1)) {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindSegments() throws Exception
|
||||
{
|
||||
final HdfsDataSegmentFinder hdfsDataSegmentFinder = new HdfsDataSegmentFinder(conf, mapper);
|
||||
|
||||
final Set<DataSegment> segments = hdfsDataSegmentFinder.findSegments(dataSourceDir.toString(), false);
|
||||
|
||||
Assert.assertEquals(5, segments.size());
|
||||
|
||||
DataSegment updatedSegment1 = null;
|
||||
DataSegment updatedSegment2 = null;
|
||||
DataSegment updatedSegment3 = null;
|
||||
DataSegment updatedSegment4_0 = null;
|
||||
DataSegment updatedSegment4_1 = null;
|
||||
for (DataSegment dataSegment : segments) {
|
||||
if (dataSegment.getIdentifier().equals(SEGMENT_1.getIdentifier())) {
|
||||
updatedSegment1 = dataSegment;
|
||||
} else if (dataSegment.getIdentifier().equals(SEGMENT_2.getIdentifier())) {
|
||||
updatedSegment2 = dataSegment;
|
||||
} else if (dataSegment.getIdentifier().equals(SEGMENT_3.getIdentifier())) {
|
||||
updatedSegment3 = dataSegment;
|
||||
} else if (dataSegment.getIdentifier().equals(SEGMENT_4_0.getIdentifier())) {
|
||||
updatedSegment4_0 = dataSegment;
|
||||
} else if (dataSegment.getIdentifier().equals(SEGMENT_4_1.getIdentifier())) {
|
||||
updatedSegment4_1 = dataSegment;
|
||||
} else {
|
||||
Assert.fail("Unexpected segment");
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertEquals(descriptor1.toUri().getPath(), getDescriptorPath(updatedSegment1));
|
||||
Assert.assertEquals(descriptor2.toUri().getPath(), getDescriptorPath(updatedSegment2));
|
||||
Assert.assertEquals(descriptor3.toUri().getPath(), getDescriptorPath(updatedSegment3));
|
||||
Assert.assertEquals(descriptor4_0.toUri().getPath(), getDescriptorPath(updatedSegment4_0));
|
||||
Assert.assertEquals(descriptor4_1.toUri().getPath(), getDescriptorPath(updatedSegment4_1));
|
||||
|
||||
final String serializedSegment1 = mapper.writeValueAsString(updatedSegment1);
|
||||
final String serializedSegment2 = mapper.writeValueAsString(updatedSegment2);
|
||||
final String serializedSegment3 = mapper.writeValueAsString(updatedSegment3);
|
||||
final String serializedSegment4_0 = mapper.writeValueAsString(updatedSegment4_0);
|
||||
final String serializedSegment4_1 = mapper.writeValueAsString(updatedSegment4_1);
|
||||
|
||||
// since updateDescriptor was not enabled, descriptor.json still has stale information
|
||||
Assert.assertNotEquals(serializedSegment1, readContent(descriptor1));
|
||||
Assert.assertNotEquals(serializedSegment2, readContent(descriptor2));
|
||||
Assert.assertNotEquals(serializedSegment3, readContent(descriptor3));
|
||||
Assert.assertNotEquals(serializedSegment4_0, readContent(descriptor4_0));
|
||||
Assert.assertNotEquals(serializedSegment4_1, readContent(descriptor4_1));
|
||||
|
||||
// enable updateDescriptor so that descriptors.json will be updated to relfect the new loadSpec
|
||||
final Set<DataSegment> segments2 = hdfsDataSegmentFinder.findSegments(dataSourceDir.toString(), true);
|
||||
|
||||
Assert.assertEquals(segments, segments2);
|
||||
Assert.assertEquals(serializedSegment1, readContent(descriptor1));
|
||||
Assert.assertEquals(serializedSegment2, readContent(descriptor2));
|
||||
Assert.assertEquals(serializedSegment3, readContent(descriptor3));
|
||||
Assert.assertEquals(serializedSegment4_0, readContent(descriptor4_0));
|
||||
Assert.assertEquals(serializedSegment4_1, readContent(descriptor4_1));
|
||||
}
|
||||
|
||||
@Test(expected = SegmentLoadingException.class)
|
||||
public void testFindSegmentsFail() throws Exception
|
||||
{
|
||||
// remove one of index.zip while keeping its descriptor.json
|
||||
fs.delete(indexZip4_1, false);
|
||||
|
||||
final HdfsDataSegmentFinder hdfsDataSegmentFinder = new HdfsDataSegmentFinder(conf, mapper);
|
||||
hdfsDataSegmentFinder.findSegments(dataSourceDir.toString(), false);
|
||||
}
|
||||
|
||||
@Test(expected = SegmentLoadingException.class)
|
||||
public void testFindSegmentsFail2() throws SegmentLoadingException
|
||||
{
|
||||
// will fail to desierialize descriptor.json because DefaultObjectMapper doesn't recognize NumberedShardSpec
|
||||
final HdfsDataSegmentFinder hdfsDataSegmentFinder = new HdfsDataSegmentFinder(conf, new DefaultObjectMapper());
|
||||
try {
|
||||
hdfsDataSegmentFinder.findSegments(dataSourceDir.toString(), false);
|
||||
}
|
||||
catch (SegmentLoadingException e) {
|
||||
Assert.assertTrue(e.getCause() instanceof IOException);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private String getDescriptorPath(DataSegment segment)
|
||||
{
|
||||
final Path indexzip = new Path(String.valueOf(segment.getLoadSpec().get("path")));
|
||||
return indexzip.getParent().toString() + "/" + DESCRIPTOR_JSON;
|
||||
}
|
||||
|
||||
private String readContent(Path descriptor) throws IOException
|
||||
{
|
||||
final FSDataInputStream is = fs.open(descriptor);
|
||||
final String content = IOUtils.toString(is);
|
||||
is.close();
|
||||
return content;
|
||||
}
|
||||
}
|
|
@ -26,8 +26,10 @@ import com.google.inject.Key;
|
|||
import com.google.inject.multibindings.MapBinder;
|
||||
import io.druid.data.SearchableVersionedDataFinder;
|
||||
import io.druid.initialization.DruidModule;
|
||||
import io.druid.segment.loading.DataSegmentFinder;
|
||||
import io.druid.segment.loading.DataSegmentKiller;
|
||||
import io.druid.segment.loading.DataSegmentPusher;
|
||||
import io.druid.segment.loading.LocalDataSegmentFinder;
|
||||
import io.druid.segment.loading.LocalDataSegmentKiller;
|
||||
import io.druid.segment.loading.LocalDataSegmentPuller;
|
||||
import io.druid.segment.loading.LocalDataSegmentPusher;
|
||||
|
@ -55,6 +57,8 @@ public class LocalDataStorageDruidModule implements DruidModule
|
|||
PolyBind.createChoice(
|
||||
binder, "druid.storage.type", Key.get(DataSegmentPusher.class), Key.get(LocalDataSegmentPusher.class)
|
||||
);
|
||||
|
||||
PolyBind.createChoice(binder, "druid.storage.type", Key.get(DataSegmentFinder.class), null);
|
||||
}
|
||||
|
||||
private static void bindDeepStorageLocal(Binder binder)
|
||||
|
@ -79,6 +83,11 @@ public class LocalDataStorageDruidModule implements DruidModule
|
|||
.to(LocalDataSegmentPusher.class)
|
||||
.in(LazySingleton.class);
|
||||
|
||||
PolyBind.optionBinder(binder, Key.get(DataSegmentFinder.class))
|
||||
.addBinding(SCHEME)
|
||||
.to(LocalDataSegmentFinder.class)
|
||||
.in(LazySingleton.class);
|
||||
|
||||
JsonConfigProvider.bind(binder, "druid.storage", LocalDataSegmentPusherConfig.class);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.loading;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.api.client.util.Sets;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.guice.LocalDataStorageDruidModule;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class LocalDataSegmentFinder implements DataSegmentFinder
|
||||
{
|
||||
|
||||
private static final Logger log = new Logger(LocalDataSegmentFinder.class);
|
||||
|
||||
private final ObjectMapper mapper;
|
||||
|
||||
@Inject
|
||||
public LocalDataSegmentFinder(ObjectMapper mapper)
|
||||
{
|
||||
this.mapper = mapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<DataSegment> findSegments(String workingDirPath, boolean updateDescriptor)
|
||||
throws SegmentLoadingException
|
||||
{
|
||||
|
||||
final Set<DataSegment> segments = Sets.newHashSet();
|
||||
final File workingDir = new File(workingDirPath);
|
||||
if (!workingDir.isDirectory()) {
|
||||
throw new SegmentLoadingException("Working directory [%s] didn't exist !?", workingDir);
|
||||
}
|
||||
recursiveSearchSegments(segments, workingDir, updateDescriptor);
|
||||
return segments;
|
||||
}
|
||||
|
||||
private void recursiveSearchSegments(Set<DataSegment> segments, File workingDir, boolean updateDescriptor)
|
||||
throws SegmentLoadingException
|
||||
{
|
||||
for (File file : workingDir.listFiles()) {
|
||||
if (file.isDirectory()) {
|
||||
recursiveSearchSegments(segments, file, updateDescriptor);
|
||||
} else if (file.getName().equals("descriptor.json")) {
|
||||
final File indexZip = new File(file.getParentFile(), "index.zip");
|
||||
if (indexZip.exists()) {
|
||||
try {
|
||||
final DataSegment dataSegment = mapper.readValue(FileUtils.readFileToString(file), DataSegment.class);
|
||||
log.info("Found segment [%s] located at [%s]", dataSegment.getIdentifier(), indexZip.getAbsoluteFile());
|
||||
final Map<String, Object> loadSpec = dataSegment.getLoadSpec();
|
||||
if (!loadSpec.get("type").equals(LocalDataStorageDruidModule.SCHEME) || !loadSpec.get("path")
|
||||
.equals(indexZip.getAbsoluteFile())) {
|
||||
loadSpec.put("type", LocalDataStorageDruidModule.SCHEME);
|
||||
loadSpec.put("path", indexZip.getAbsolutePath());
|
||||
if (updateDescriptor) {
|
||||
log.info(
|
||||
"Updating loadSpec in descriptor.json at [%s] with new path [%s]",
|
||||
file.getAbsolutePath(),
|
||||
indexZip.toString()
|
||||
);
|
||||
FileUtils.writeStringToFile(file, mapper.writeValueAsString(dataSegment));
|
||||
}
|
||||
}
|
||||
segments.add(dataSegment);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new SegmentLoadingException(
|
||||
e,
|
||||
"Failed to read descriptor.json for segment located at [%s]",
|
||||
file.getAbsoluteFile()
|
||||
);
|
||||
}
|
||||
} else {
|
||||
throw new SegmentLoadingException(
|
||||
"index.zip didn't exist at [%s] while descripter.json exists!?",
|
||||
indexZip.getAbsoluteFile()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,252 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.loading;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.NumberedShardSpec;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class LocalDataSegmentFinderTest
|
||||
{
|
||||
|
||||
private static final ObjectMapper mapper = new DefaultObjectMapper();
|
||||
private static final String DESCRIPTOR_JSON = "descriptor.json";
|
||||
private static final String INDEX_ZIP = "index.zip";
|
||||
private static final DataSegment SEGMENT_1 = DataSegment.builder()
|
||||
.dataSource("wikipedia")
|
||||
.interval(
|
||||
new Interval(
|
||||
"2013-08-31T00:00:00.000Z/2013-09-01T00:00:00.000Z"
|
||||
)
|
||||
)
|
||||
.version("2015-10-21T22:07:57.074Z")
|
||||
.loadSpec(
|
||||
ImmutableMap.<String, Object>of(
|
||||
"type",
|
||||
"local",
|
||||
"path",
|
||||
"/tmp/somewhere/index.zip"
|
||||
)
|
||||
)
|
||||
.dimensions(ImmutableList.of("language", "page"))
|
||||
.metrics(ImmutableList.of("count"))
|
||||
.build();
|
||||
|
||||
private static final DataSegment SEGMENT_2 = DataSegment.builder(SEGMENT_1)
|
||||
.interval(
|
||||
new Interval(
|
||||
"2013-09-01T00:00:00.000Z/2013-09-02T00:00:00.000Z"
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
||||
private static final DataSegment SEGMENT_3 = DataSegment.builder(SEGMENT_1)
|
||||
.interval(
|
||||
new Interval(
|
||||
"2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"
|
||||
)
|
||||
)
|
||||
.version("2015-10-22T22:07:57.074Z")
|
||||
.build();
|
||||
|
||||
private static final DataSegment SEGMENT_4_0 = DataSegment.builder(SEGMENT_1)
|
||||
.interval(
|
||||
new Interval(
|
||||
"2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"
|
||||
)
|
||||
)
|
||||
.shardSpec(new NumberedShardSpec(0, 2))
|
||||
.build();
|
||||
|
||||
private static final DataSegment SEGMENT_4_1 = DataSegment.builder(SEGMENT_1)
|
||||
.interval(
|
||||
new Interval(
|
||||
"2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"
|
||||
)
|
||||
)
|
||||
.shardSpec(new NumberedShardSpec(1, 2))
|
||||
.build();
|
||||
|
||||
@Rule
|
||||
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
private File dataSourceDir;
|
||||
private File descriptor1;
|
||||
private File descriptor2;
|
||||
private File descriptor3;
|
||||
private File descriptor4_0;
|
||||
private File descriptor4_1;
|
||||
private File indexZip1;
|
||||
private File indexZip2;
|
||||
private File indexZip3;
|
||||
private File indexZip4_0;
|
||||
private File indexZip4_1;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpStatic()
|
||||
{
|
||||
mapper.registerSubtypes(new NamedType(NumberedShardSpec.class, "numbered"));
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
|
||||
dataSourceDir = temporaryFolder.newFolder();
|
||||
descriptor1 = new File(dataSourceDir.getAbsolutePath() + "/interval1/v1/0", DESCRIPTOR_JSON);
|
||||
descriptor2 = new File(dataSourceDir.getAbsolutePath() + "/interval2/v1/0", DESCRIPTOR_JSON);
|
||||
descriptor3 = new File(dataSourceDir.getAbsolutePath() + "/interval3/v2/0", DESCRIPTOR_JSON);
|
||||
descriptor4_0 = new File(dataSourceDir.getAbsolutePath() + "/interval4/v1/0", DESCRIPTOR_JSON);
|
||||
descriptor4_1 = new File(dataSourceDir.getAbsolutePath() + "/interval4/v1/1", DESCRIPTOR_JSON);
|
||||
|
||||
descriptor1.getParentFile().mkdirs();
|
||||
descriptor2.getParentFile().mkdirs();
|
||||
descriptor3.getParentFile().mkdirs();
|
||||
descriptor4_0.getParentFile().mkdirs();
|
||||
descriptor4_1.getParentFile().mkdirs();
|
||||
|
||||
mapper.writeValue(descriptor1, SEGMENT_1);
|
||||
mapper.writeValue(descriptor2, SEGMENT_2);
|
||||
mapper.writeValue(descriptor3, SEGMENT_3);
|
||||
mapper.writeValue(descriptor4_0, SEGMENT_4_0);
|
||||
mapper.writeValue(descriptor4_1, SEGMENT_4_1);
|
||||
|
||||
indexZip1 = new File(descriptor1.getParentFile(), INDEX_ZIP);
|
||||
indexZip2 = new File(descriptor2.getParentFile(), INDEX_ZIP);
|
||||
indexZip3 = new File(descriptor3.getParentFile(), INDEX_ZIP);
|
||||
indexZip4_0 = new File(descriptor4_0.getParentFile(), INDEX_ZIP);
|
||||
indexZip4_1 = new File(descriptor4_1.getParentFile(), INDEX_ZIP);
|
||||
|
||||
indexZip1.createNewFile();
|
||||
indexZip2.createNewFile();
|
||||
indexZip3.createNewFile();
|
||||
indexZip4_0.createNewFile();
|
||||
indexZip4_1.createNewFile();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindSegments() throws SegmentLoadingException, IOException
|
||||
{
|
||||
final LocalDataSegmentFinder localDataSegmentFinder = new LocalDataSegmentFinder(mapper);
|
||||
|
||||
final Set<DataSegment> segments = localDataSegmentFinder.findSegments(dataSourceDir.getAbsolutePath(), false);
|
||||
|
||||
Assert.assertEquals(5, segments.size());
|
||||
|
||||
DataSegment updatedSegment1 = null;
|
||||
DataSegment updatedSegment2 = null;
|
||||
DataSegment updatedSegment3 = null;
|
||||
DataSegment updatedSegment4_0 = null;
|
||||
DataSegment updatedSegment4_1 = null;
|
||||
for (DataSegment dataSegment : segments) {
|
||||
if (dataSegment.getIdentifier().equals(SEGMENT_1.getIdentifier())) {
|
||||
updatedSegment1 = dataSegment;
|
||||
} else if (dataSegment.getIdentifier().equals(SEGMENT_2.getIdentifier())) {
|
||||
updatedSegment2 = dataSegment;
|
||||
} else if (dataSegment.getIdentifier().equals(SEGMENT_3.getIdentifier())) {
|
||||
updatedSegment3 = dataSegment;
|
||||
} else if (dataSegment.getIdentifier().equals(SEGMENT_4_0.getIdentifier())) {
|
||||
updatedSegment4_0 = dataSegment;
|
||||
} else if (dataSegment.getIdentifier().equals(SEGMENT_4_1.getIdentifier())) {
|
||||
updatedSegment4_1 = dataSegment;
|
||||
} else {
|
||||
Assert.fail("Unexpected segment");
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertEquals(descriptor1.getAbsolutePath(), getDescriptorPath(updatedSegment1));
|
||||
Assert.assertEquals(descriptor2.getAbsolutePath(), getDescriptorPath(updatedSegment2));
|
||||
Assert.assertEquals(descriptor3.getAbsolutePath(), getDescriptorPath(updatedSegment3));
|
||||
Assert.assertEquals(descriptor4_0.getAbsolutePath(), getDescriptorPath(updatedSegment4_0));
|
||||
Assert.assertEquals(descriptor4_1.getAbsolutePath(), getDescriptorPath(updatedSegment4_1));
|
||||
|
||||
final String serializedSegment1 = mapper.writeValueAsString(updatedSegment1);
|
||||
final String serializedSegment2 = mapper.writeValueAsString(updatedSegment2);
|
||||
final String serializedSegment3 = mapper.writeValueAsString(updatedSegment3);
|
||||
final String serializedSegment4_0 = mapper.writeValueAsString(updatedSegment4_0);
|
||||
final String serializedSegment4_1 = mapper.writeValueAsString(updatedSegment4_1);
|
||||
|
||||
// since updateDescriptor was not enabled, descriptor.json still has stale information
|
||||
Assert.assertNotEquals(serializedSegment1, FileUtils.readFileToString(descriptor1));
|
||||
Assert.assertNotEquals(serializedSegment2, FileUtils.readFileToString(descriptor2));
|
||||
Assert.assertNotEquals(serializedSegment3, FileUtils.readFileToString(descriptor3));
|
||||
Assert.assertNotEquals(serializedSegment4_0, FileUtils.readFileToString(descriptor4_0));
|
||||
Assert.assertNotEquals(serializedSegment4_1, FileUtils.readFileToString(descriptor4_1));
|
||||
|
||||
// enable updateDescriptor so that descriptors.json will be updated to relfect the new loadSpec
|
||||
final Set<DataSegment> segments2 = localDataSegmentFinder.findSegments(dataSourceDir.getAbsolutePath(), true);
|
||||
|
||||
Assert.assertEquals(segments, segments2);
|
||||
Assert.assertEquals(serializedSegment1, FileUtils.readFileToString(descriptor1));
|
||||
Assert.assertEquals(serializedSegment2, FileUtils.readFileToString(descriptor2));
|
||||
Assert.assertEquals(serializedSegment3, FileUtils.readFileToString(descriptor3));
|
||||
Assert.assertEquals(serializedSegment4_0, FileUtils.readFileToString(descriptor4_0));
|
||||
Assert.assertEquals(serializedSegment4_1, FileUtils.readFileToString(descriptor4_1));
|
||||
}
|
||||
|
||||
private String getDescriptorPath(DataSegment segment)
|
||||
{
|
||||
final File indexzip = new File(String.valueOf(segment.getLoadSpec().get("path")));
|
||||
return indexzip.getParent() + "/" + DESCRIPTOR_JSON;
|
||||
}
|
||||
|
||||
@Test(expected = SegmentLoadingException.class)
|
||||
public void testFindSegmentsFail() throws SegmentLoadingException
|
||||
{
|
||||
// remove one of index.zip while keeping its descriptor.json
|
||||
indexZip4_1.delete();
|
||||
|
||||
final LocalDataSegmentFinder localDataSegmentFinder = new LocalDataSegmentFinder(mapper);
|
||||
localDataSegmentFinder.findSegments(dataSourceDir.getAbsolutePath(), false);
|
||||
}
|
||||
|
||||
@Test(expected = SegmentLoadingException.class)
|
||||
public void testFindSegmentsFail2() throws SegmentLoadingException
|
||||
{
|
||||
// will fail to desierialize descriptor.json because DefaultObjectMapper doesn't recognize NumberedShardSpec
|
||||
final LocalDataSegmentFinder localDataSegmentFinder = new LocalDataSegmentFinder(new DefaultObjectMapper());
|
||||
try {
|
||||
localDataSegmentFinder.findSegments(dataSourceDir.getAbsolutePath(), false);
|
||||
}
|
||||
catch (SegmentLoadingException e) {
|
||||
Assert.assertTrue(e.getCause() instanceof IOException);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,138 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.cli;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.Module;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.airlift.airline.Command;
|
||||
import io.airlift.airline.Option;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
import io.druid.guice.annotations.Json;
|
||||
import io.druid.guice.annotations.Self;
|
||||
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
|
||||
import io.druid.segment.loading.DataSegmentFinder;
|
||||
import io.druid.segment.loading.SegmentLoadingException;
|
||||
import io.druid.server.DruidNode;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
@Command(
|
||||
name = "insert-segment-to-db",
|
||||
description = "insert a segment into metadata storage"
|
||||
)
|
||||
public class InsertSegment extends GuiceRunnable
|
||||
{
|
||||
private static final Logger log = new Logger(InsertSegment.class);
|
||||
|
||||
@Option(name = "--workingDir", description = "The directory path where segments are stored. This tool will recursively look for segments underneath this directory and insert/update these segments in metdata storage.", required = true)
|
||||
private String workingDirPath;
|
||||
|
||||
@Option(name = "--updateDescriptor", description = "if set to true, this tool will update loadSpec field in descriptor.json if the path in loadSpec is different from where desciptor.json was found. Default value is true", required = false)
|
||||
private boolean updateDescriptor = true;
|
||||
|
||||
private ObjectMapper mapper;
|
||||
private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
|
||||
|
||||
public InsertSegment()
|
||||
{
|
||||
super(log);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<? extends Module> getModules()
|
||||
{
|
||||
return ImmutableList.<Module>of(
|
||||
new Module()
|
||||
{
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
JsonConfigProvider.bindInstance(
|
||||
binder, Key.get(DruidNode.class, Self.class), new DruidNode("tools", "localhost", -1)
|
||||
);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
final Injector injector = makeInjector();
|
||||
mapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class));
|
||||
indexerMetadataStorageCoordinator = injector.getInstance(IndexerMetadataStorageCoordinator.class);
|
||||
final DataSegmentFinder dataSegmentFinder = injector.getInstance(DataSegmentFinder.class);
|
||||
|
||||
log.info("Start seraching segments under [%s]", workingDirPath);
|
||||
|
||||
Set<DataSegment> segments = null;
|
||||
try {
|
||||
segments = dataSegmentFinder.findSegments(workingDirPath, updateDescriptor);
|
||||
}
|
||||
catch (SegmentLoadingException e) {
|
||||
Throwables.propagate(e);
|
||||
}
|
||||
|
||||
log.info(
|
||||
"Done searching segments under [%s], [%d] segments were found",
|
||||
workingDirPath,
|
||||
segments.size()
|
||||
);
|
||||
|
||||
try {
|
||||
insertSegments(segments);
|
||||
}
|
||||
catch (IOException e) {
|
||||
Throwables.propagate(e);
|
||||
}
|
||||
|
||||
log.info("Done processing [%d] segments", segments.size());
|
||||
}
|
||||
|
||||
private void insertSegments(final Set<DataSegment> segments) throws IOException
|
||||
{
|
||||
final Set<DataSegment> segmentsInserted = indexerMetadataStorageCoordinator.announceHistoricalSegments(segments);
|
||||
for (DataSegment dataSegment : segmentsInserted) {
|
||||
log.info("Sucessfully inserted Segment [%s] into metadata storage", dataSegment.getIdentifier());
|
||||
}
|
||||
final Set<DataSegment> segmentsAlreadyExist = Sets.difference(segments, segmentsInserted);
|
||||
if (!segmentsAlreadyExist.isEmpty()) {
|
||||
for (DataSegment dataSegment : segmentsAlreadyExist) {
|
||||
log.info("Segment [%s] already exists in metadata storage, updating the payload", dataSegment.getIdentifier());
|
||||
}
|
||||
indexerMetadataStorageCoordinator.updateSegmentMetadata(segmentsAlreadyExist);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -69,7 +69,13 @@ public class Main
|
|||
builder.withGroup("tools")
|
||||
.withDescription("Various tools for working with Druid")
|
||||
.withDefaultCommand(Help.class)
|
||||
.withCommands(ConvertProperties.class, DruidJsonValidator.class, PullDependencies.class, CreateTables.class);
|
||||
.withCommands(
|
||||
ConvertProperties.class,
|
||||
DruidJsonValidator.class,
|
||||
PullDependencies.class,
|
||||
CreateTables.class,
|
||||
InsertSegment.class
|
||||
);
|
||||
|
||||
builder.withGroup("index")
|
||||
.withDescription("Run indexing for druid")
|
||||
|
|
Loading…
Reference in New Issue