DataSegmentFinder tool

`insert-segment-to-db` is a tool that can insert segments into Druid metadata storage. It is intended to be used
to update the segment table in metadata storage after people manually migrate segments from one place to another.
It can also be used to insert missing segment into Druid, or even recover metadata storage by telling it where the
segments are stored.

Note: This tool expects users to have Druid cluster running in a "safe" mode, where there are no active tasks to interfere
the segments being inserted. Users can optionally bring down the cluster to make 100% sure nothing is interfering.
This commit is contained in:
Bingkun Guo 2015-10-25 20:44:43 -05:00
parent e6c2db89da
commit 89b477970f
11 changed files with 1028 additions and 5 deletions

View File

@ -57,3 +57,8 @@ UIs
* [mistercrunch/panoramix](https://github.com/mistercrunch/panoramix) - A web application to slice, dice and visualize data out of Druid * [mistercrunch/panoramix](https://github.com/mistercrunch/panoramix) - A web application to slice, dice and visualize data out of Druid
* [grafana](https://github.com/Quantiply/grafana-plugins/tree/master/features/druid) - A plugin for [Grafana](http://grafana.org/) * [grafana](https://github.com/Quantiply/grafana-plugins/tree/master/features/druid) - A plugin for [Grafana](http://grafana.org/)
* [Pivot](https://github.com/implydata/pivot) - An exploratory analytics UI for Druid * [Pivot](https://github.com/implydata/pivot) - An exploratory analytics UI for Druid
Tools
---
* [Insert Segments](../../operations/insert-segment-to-db.html) - A tool that can insert segments' metadata into Druid metadata storage.

View File

@ -0,0 +1,95 @@
---
layout: doc_page
---
# insert-segment-to-db Tool
`insert-segment-to-db` is a tool that can insert segments into Druid metadata storage. It is intended to be used
to update the segment table in metadata storage after people manually migrate segments from one place to another.
It can also be used to insert missing segment into Druid, or even recover metadata storage by telling it where the
segments are stored.
Note: This tool expects users to have Druid cluster running in a "safe" mode, where there are no active tasks to interfere
the segments being inserted. Users can optionally bring down the cluster to make 100% sure nothing is interfering.
In order to make it work, user will have to provide metadata storage credentials and deep storage type through Java JVM argument
or runtime.properties file. Specifically, this tool needs to know
`druid.metadata.storage.type`
`druid.metadata.storage.connector.connectURI`
`druid.metadata.storage.connector.user`
`druid.metadata.storage.connector.password`
`druid.storage.type`
Besides the properties above, you also need to specify the location where the segments are stored and whether you want to
update descriptor.json. These two can be provided through command line arguments.
`--workingDir` (Required)
The directory URI where segments are stored. This tool will recursively look for segments underneath this directory
and insert/update these segments in metdata storage.
Attention: workingDir must be a complete URI, which means it must be prefixed with scheme type. For example,
hdfs://hostname:port/segment_directory
`--updateDescriptor` (Optional)
if set to true, this tool will update `loadSpec` field in `descriptor.json` if the path in `loadSpec` is different from
where `desciptor.json` was found. Default value is `true`.
Note: you will also need to load different Druid extensions per the metadata and deep storage you use. For example, if you
use `mysql` as metadata storage and `HDFS` as deep storage, you should load `mysql-metadata-storage` and `druid-hdfs-storage`
extensions.
Example:
Suppose your metadata storage is `mysql` and you've migrated some segments to a directory in HDFS, and that directory looks
like this,
```
Directory path: /druid/storage/wikipedia
├── 2013-08-31T000000.000Z_2013-09-01T000000.000Z
│   └── 2015-10-21T22_07_57.074Z
│   └── 0
│   ├── descriptor.json
│   └── index.zip
├── 2013-09-01T000000.000Z_2013-09-02T000000.000Z
│   └── 2015-10-21T22_07_57.074Z
│   └── 0
│   ├── descriptor.json
│   └── index.zip
├── 2013-09-02T000000.000Z_2013-09-03T000000.000Z
│   └── 2015-10-21T22_07_57.074Z
│   └── 0
│   ├── descriptor.json
│   └── index.zip
└── 2013-09-03T000000.000Z_2013-09-04T000000.000Z
└── 2015-10-21T22_07_57.074Z
└── 0
├── descriptor.json
└── index.zip
```
To load all these segments into `mysql`, you can fire the command below,
```
java
-Ddruid.metadata.storage.type=mysql
-Ddruid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
-Ddruid.metadata.storage.connector.user=druid
-Ddruid.metadata.storage.connector.password=diurd
-Ddruid.extensions.loadList=[\"mysql-metadata-storage\",\"druid-hdfs-storage\"]
-Ddruid.storage.type=hdfs
-cp $DRUID_CLASSPATH
io.druid.cli.Main tools insert-segment --workingDir hdfs://host:port//druid/storage/wikipedia --updateDescriptor true
```
In this example, `mysql` and deep storage type are provided through Java JVM arguments, you can optionally put all
of them in a runtime.properites file and include it in the Druid classpath. Note that we also include `mysql-metadata-storage`
and `druid-hdfs-storage` in the extension list.
After running this command, the segments table in `mysql` should store the new location for each segment we just inserted.

View File

@ -0,0 +1,116 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.hdfs;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.segment.loading.DataSegmentFinder;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
/**
*/
public class HdfsDataSegmentFinder implements DataSegmentFinder
{
private static final Logger log = new Logger(HdfsDataSegmentFinder.class);
private final Configuration config;
private final ObjectMapper mapper;
@Inject
public HdfsDataSegmentFinder(Configuration config, ObjectMapper mapper)
{
this.config = config;
this.mapper = mapper;
}
@Override
public Set<DataSegment> findSegments(String workingDirPathStr, boolean updateDescriptor)
throws SegmentLoadingException
{
final Set<DataSegment> segments = Sets.newHashSet();
final Path workingDirPath = new Path(workingDirPathStr);
FileSystem fs;
try {
fs = workingDirPath.getFileSystem(config);
log.info(fs.getScheme());
log.info("FileSystem URI:" + fs.getUri().toString());
if (!fs.exists(workingDirPath)) {
throw new SegmentLoadingException("Working directory [%s] doesn't exist.", workingDirPath);
}
if (!fs.isDirectory(workingDirPath)) {
throw new SegmentLoadingException("Working directory [%s] is not a directory!?", workingDirPath);
}
final RemoteIterator<LocatedFileStatus> it = fs.listFiles(workingDirPath, true);
while (it.hasNext()) {
final LocatedFileStatus locatedFileStatus = it.next();
final Path path = locatedFileStatus.getPath();
if (path.getName().equals("descriptor.json")) {
final Path indexZip = new Path(path.getParent(), "index.zip");
if (fs.exists(indexZip)) {
final DataSegment dataSegment = mapper.readValue(fs.open(path), DataSegment.class);
log.info("Found segment [%s] located at [%s]", dataSegment.getIdentifier(), indexZip);
final Map<String, Object> loadSpec = dataSegment.getLoadSpec();
final String pathWithoutScheme = indexZip.toUri().getPath();
if (!loadSpec.get("type").equals(HdfsStorageDruidModule.SCHEME) || !loadSpec.get("path")
.equals(pathWithoutScheme)) {
loadSpec.put("type", HdfsStorageDruidModule.SCHEME);
loadSpec.put("path", pathWithoutScheme);
if (updateDescriptor) {
log.info("Updating loadSpec in descriptor.json at [%s] with new path [%s]", path, pathWithoutScheme);
mapper.writeValue(fs.create(path, true), dataSegment);
}
}
segments.add(dataSegment);
} else {
throw new SegmentLoadingException(
"index.zip didn't exist at [%s] while descripter.json exists!?",
indexZip
);
}
}
}
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Problems interacting with filesystem[%s].", workingDirPath);
}
return segments;
}
}

View File

@ -38,17 +38,21 @@ public class HdfsLoadSpec implements LoadSpec
{ {
private final Path path; private final Path path;
final HdfsDataSegmentPuller puller; final HdfsDataSegmentPuller puller;
@JsonCreator @JsonCreator
public HdfsLoadSpec( public HdfsLoadSpec(
@JacksonInject HdfsDataSegmentPuller puller, @JacksonInject HdfsDataSegmentPuller puller,
@JsonProperty(value = "path", required = true) String path @JsonProperty(value = "path", required = true) String path
){ )
{
Preconditions.checkNotNull(path); Preconditions.checkNotNull(path);
this.path = new Path(path); this.path = new Path(path);
this.puller = puller; this.puller = puller;
} }
@JsonProperty("path") @JsonProperty("path")
public final String getPathString(){ public final String getPathString()
{
return path.toString(); return path.toString();
} }

View File

@ -91,6 +91,7 @@ public class HdfsStorageDruidModule implements DruidModule
Binders.dataSegmentPullerBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentPuller.class).in(LazySingleton.class); Binders.dataSegmentPullerBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentPuller.class).in(LazySingleton.class);
Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentPusher.class).in(LazySingleton.class); Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentPusher.class).in(LazySingleton.class);
Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentKiller.class).in(LazySingleton.class); Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentKiller.class).in(LazySingleton.class);
Binders.dataSegmentFinderBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentFinder.class).in(LazySingleton.class);
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
@ -103,9 +104,11 @@ public class HdfsStorageDruidModule implements DruidModule
try { try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
FileSystem.get(conf); FileSystem.get(conf);
} catch(IOException ex) { }
catch (IOException ex) {
throw Throwables.propagate(ex); throw Throwables.propagate(ex);
} finally { }
finally {
Thread.currentThread().setContextClassLoader(currCtxCl); Thread.currentThread().setContextClassLoader(currCtxCl);
} }

View File

@ -0,0 +1,287 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.storage.hdfs.HdfsDataSegmentFinder;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NumberedShardSpec;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.joda.time.Interval;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.Set;
/**
*/
public class HdfsDataSegmentFinderTest
{
private static final ObjectMapper mapper = new DefaultObjectMapper();
private static final String DESCRIPTOR_JSON = "descriptor.json";
private static final String INDEX_ZIP = "index.zip";
private static final DataSegment SEGMENT_1 = DataSegment.builder()
.dataSource("wikipedia")
.interval(
new Interval(
"2013-08-31T00:00:00.000Z/2013-09-01T00:00:00.000Z"
)
)
.version("2015-10-21T22:07:57.074Z")
.loadSpec(
ImmutableMap.<String, Object>of(
"type",
"hdfs",
"path",
"hdfs://abc.com:1234/somewhere/index.zip"
)
)
.dimensions(ImmutableList.of("language", "page"))
.metrics(ImmutableList.of("count"))
.build();
private static final DataSegment SEGMENT_2 = DataSegment.builder(SEGMENT_1)
.interval(
new Interval(
"2013-09-01T00:00:00.000Z/2013-09-02T00:00:00.000Z"
)
)
.build();
private static final DataSegment SEGMENT_3 = DataSegment.builder(SEGMENT_1)
.interval(
new Interval(
"2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"
)
)
.version("2015-10-22T22:07:57.074Z")
.build();
private static final DataSegment SEGMENT_4_0 = DataSegment.builder(SEGMENT_1)
.interval(
new Interval(
"2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"
)
)
.shardSpec(new NumberedShardSpec(0, 2))
.build();
private static final DataSegment SEGMENT_4_1 = DataSegment.builder(SEGMENT_1)
.interval(
new Interval(
"2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"
)
)
.shardSpec(new NumberedShardSpec(1, 2))
.build();
private static MiniDFSCluster miniCluster;
private static File hdfsTmpDir;
private static URI uriBase;
private static Configuration conf;
private static FileSystem fs;
private Path dataSourceDir;
private Path descriptor1;
private Path descriptor2;
private Path descriptor3;
private Path descriptor4_0;
private Path descriptor4_1;
private Path indexZip1;
private Path indexZip2;
private Path indexZip3;
private Path indexZip4_0;
private Path indexZip4_1;
@BeforeClass
public static void setupStatic() throws IOException
{
mapper.registerSubtypes(new NamedType(NumberedShardSpec.class, "numbered"));
hdfsTmpDir = File.createTempFile("hdfsDataSource", "dir");
hdfsTmpDir.deleteOnExit();
if (!hdfsTmpDir.delete()) {
throw new IOException(String.format("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath()));
}
conf = new Configuration(true);
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsTmpDir.getAbsolutePath());
miniCluster = new MiniDFSCluster.Builder(conf).build();
uriBase = miniCluster.getURI();
fs = miniCluster.getFileSystem();
}
@AfterClass
public static void tearDownStatic()
{
if (miniCluster != null) {
miniCluster.shutdown(true);
}
}
@Before
public void setUp() throws IOException
{
dataSourceDir = new Path(new Path(uriBase), "/usr/dataSource");
descriptor1 = new Path(dataSourceDir, "interval1/v1/0/" + DESCRIPTOR_JSON);
descriptor2 = new Path(dataSourceDir, "interval2/v1/0/" + DESCRIPTOR_JSON);
descriptor3 = new Path(dataSourceDir, "interval3/v2/0/" + DESCRIPTOR_JSON);
descriptor4_0 = new Path(dataSourceDir, "interval4/v1/0/" + DESCRIPTOR_JSON);
descriptor4_1 = new Path(dataSourceDir, "interval4/v1/1/" + DESCRIPTOR_JSON);
indexZip1 = new Path(descriptor1.getParent(), INDEX_ZIP);
indexZip2 = new Path(descriptor2.getParent(), INDEX_ZIP);
indexZip3 = new Path(descriptor3.getParent(), INDEX_ZIP);
indexZip4_0 = new Path(descriptor4_0.getParent(), INDEX_ZIP);
indexZip4_1 = new Path(descriptor4_1.getParent(), INDEX_ZIP);
mapper.writeValue(fs.create(descriptor1), SEGMENT_1);
mapper.writeValue(fs.create(descriptor2), SEGMENT_2);
mapper.writeValue(fs.create(descriptor3), SEGMENT_3);
mapper.writeValue(fs.create(descriptor4_0), SEGMENT_4_0);
mapper.writeValue(fs.create(descriptor4_1), SEGMENT_4_1);
create(indexZip1);
create(indexZip2);
create(indexZip3);
create(indexZip4_0);
create(indexZip4_1);
}
private void create(Path indexZip1) throws IOException
{
try (FSDataOutputStream os = fs.create(indexZip1)) {
}
}
@Test
public void testFindSegments() throws Exception
{
final HdfsDataSegmentFinder hdfsDataSegmentFinder = new HdfsDataSegmentFinder(conf, mapper);
final Set<DataSegment> segments = hdfsDataSegmentFinder.findSegments(dataSourceDir.toString(), false);
Assert.assertEquals(5, segments.size());
DataSegment updatedSegment1 = null;
DataSegment updatedSegment2 = null;
DataSegment updatedSegment3 = null;
DataSegment updatedSegment4_0 = null;
DataSegment updatedSegment4_1 = null;
for (DataSegment dataSegment : segments) {
if (dataSegment.getIdentifier().equals(SEGMENT_1.getIdentifier())) {
updatedSegment1 = dataSegment;
} else if (dataSegment.getIdentifier().equals(SEGMENT_2.getIdentifier())) {
updatedSegment2 = dataSegment;
} else if (dataSegment.getIdentifier().equals(SEGMENT_3.getIdentifier())) {
updatedSegment3 = dataSegment;
} else if (dataSegment.getIdentifier().equals(SEGMENT_4_0.getIdentifier())) {
updatedSegment4_0 = dataSegment;
} else if (dataSegment.getIdentifier().equals(SEGMENT_4_1.getIdentifier())) {
updatedSegment4_1 = dataSegment;
} else {
Assert.fail("Unexpected segment");
}
}
Assert.assertEquals(descriptor1.toUri().getPath(), getDescriptorPath(updatedSegment1));
Assert.assertEquals(descriptor2.toUri().getPath(), getDescriptorPath(updatedSegment2));
Assert.assertEquals(descriptor3.toUri().getPath(), getDescriptorPath(updatedSegment3));
Assert.assertEquals(descriptor4_0.toUri().getPath(), getDescriptorPath(updatedSegment4_0));
Assert.assertEquals(descriptor4_1.toUri().getPath(), getDescriptorPath(updatedSegment4_1));
final String serializedSegment1 = mapper.writeValueAsString(updatedSegment1);
final String serializedSegment2 = mapper.writeValueAsString(updatedSegment2);
final String serializedSegment3 = mapper.writeValueAsString(updatedSegment3);
final String serializedSegment4_0 = mapper.writeValueAsString(updatedSegment4_0);
final String serializedSegment4_1 = mapper.writeValueAsString(updatedSegment4_1);
// since updateDescriptor was not enabled, descriptor.json still has stale information
Assert.assertNotEquals(serializedSegment1, readContent(descriptor1));
Assert.assertNotEquals(serializedSegment2, readContent(descriptor2));
Assert.assertNotEquals(serializedSegment3, readContent(descriptor3));
Assert.assertNotEquals(serializedSegment4_0, readContent(descriptor4_0));
Assert.assertNotEquals(serializedSegment4_1, readContent(descriptor4_1));
// enable updateDescriptor so that descriptors.json will be updated to relfect the new loadSpec
final Set<DataSegment> segments2 = hdfsDataSegmentFinder.findSegments(dataSourceDir.toString(), true);
Assert.assertEquals(segments, segments2);
Assert.assertEquals(serializedSegment1, readContent(descriptor1));
Assert.assertEquals(serializedSegment2, readContent(descriptor2));
Assert.assertEquals(serializedSegment3, readContent(descriptor3));
Assert.assertEquals(serializedSegment4_0, readContent(descriptor4_0));
Assert.assertEquals(serializedSegment4_1, readContent(descriptor4_1));
}
@Test(expected = SegmentLoadingException.class)
public void testFindSegmentsFail() throws Exception
{
// remove one of index.zip while keeping its descriptor.json
fs.delete(indexZip4_1, false);
final HdfsDataSegmentFinder hdfsDataSegmentFinder = new HdfsDataSegmentFinder(conf, mapper);
hdfsDataSegmentFinder.findSegments(dataSourceDir.toString(), false);
}
@Test(expected = SegmentLoadingException.class)
public void testFindSegmentsFail2() throws SegmentLoadingException
{
// will fail to desierialize descriptor.json because DefaultObjectMapper doesn't recognize NumberedShardSpec
final HdfsDataSegmentFinder hdfsDataSegmentFinder = new HdfsDataSegmentFinder(conf, new DefaultObjectMapper());
try {
hdfsDataSegmentFinder.findSegments(dataSourceDir.toString(), false);
}
catch (SegmentLoadingException e) {
Assert.assertTrue(e.getCause() instanceof IOException);
throw e;
}
}
private String getDescriptorPath(DataSegment segment)
{
final Path indexzip = new Path(String.valueOf(segment.getLoadSpec().get("path")));
return indexzip.getParent().toString() + "/" + DESCRIPTOR_JSON;
}
private String readContent(Path descriptor) throws IOException
{
final FSDataInputStream is = fs.open(descriptor);
final String content = IOUtils.toString(is);
is.close();
return content;
}
}

View File

@ -26,8 +26,10 @@ import com.google.inject.Key;
import com.google.inject.multibindings.MapBinder; import com.google.inject.multibindings.MapBinder;
import io.druid.data.SearchableVersionedDataFinder; import io.druid.data.SearchableVersionedDataFinder;
import io.druid.initialization.DruidModule; import io.druid.initialization.DruidModule;
import io.druid.segment.loading.DataSegmentFinder;
import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentFinder;
import io.druid.segment.loading.LocalDataSegmentKiller; import io.druid.segment.loading.LocalDataSegmentKiller;
import io.druid.segment.loading.LocalDataSegmentPuller; import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.loading.LocalDataSegmentPusher; import io.druid.segment.loading.LocalDataSegmentPusher;
@ -55,6 +57,8 @@ public class LocalDataStorageDruidModule implements DruidModule
PolyBind.createChoice( PolyBind.createChoice(
binder, "druid.storage.type", Key.get(DataSegmentPusher.class), Key.get(LocalDataSegmentPusher.class) binder, "druid.storage.type", Key.get(DataSegmentPusher.class), Key.get(LocalDataSegmentPusher.class)
); );
PolyBind.createChoice(binder, "druid.storage.type", Key.get(DataSegmentFinder.class), null);
} }
private static void bindDeepStorageLocal(Binder binder) private static void bindDeepStorageLocal(Binder binder)
@ -79,6 +83,11 @@ public class LocalDataStorageDruidModule implements DruidModule
.to(LocalDataSegmentPusher.class) .to(LocalDataSegmentPusher.class)
.in(LazySingleton.class); .in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DataSegmentFinder.class))
.addBinding(SCHEME)
.to(LocalDataSegmentFinder.class)
.in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.storage", LocalDataSegmentPusherConfig.class); JsonConfigProvider.bind(binder, "druid.storage", LocalDataSegmentPusherConfig.class);
} }

View File

@ -0,0 +1,108 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Sets;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.guice.LocalDataStorageDruidModule;
import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
/**
*/
public class LocalDataSegmentFinder implements DataSegmentFinder
{
private static final Logger log = new Logger(LocalDataSegmentFinder.class);
private final ObjectMapper mapper;
@Inject
public LocalDataSegmentFinder(ObjectMapper mapper)
{
this.mapper = mapper;
}
@Override
public Set<DataSegment> findSegments(String workingDirPath, boolean updateDescriptor)
throws SegmentLoadingException
{
final Set<DataSegment> segments = Sets.newHashSet();
final File workingDir = new File(workingDirPath);
if (!workingDir.isDirectory()) {
throw new SegmentLoadingException("Working directory [%s] didn't exist !?", workingDir);
}
recursiveSearchSegments(segments, workingDir, updateDescriptor);
return segments;
}
private void recursiveSearchSegments(Set<DataSegment> segments, File workingDir, boolean updateDescriptor)
throws SegmentLoadingException
{
for (File file : workingDir.listFiles()) {
if (file.isDirectory()) {
recursiveSearchSegments(segments, file, updateDescriptor);
} else if (file.getName().equals("descriptor.json")) {
final File indexZip = new File(file.getParentFile(), "index.zip");
if (indexZip.exists()) {
try {
final DataSegment dataSegment = mapper.readValue(FileUtils.readFileToString(file), DataSegment.class);
log.info("Found segment [%s] located at [%s]", dataSegment.getIdentifier(), indexZip.getAbsoluteFile());
final Map<String, Object> loadSpec = dataSegment.getLoadSpec();
if (!loadSpec.get("type").equals(LocalDataStorageDruidModule.SCHEME) || !loadSpec.get("path")
.equals(indexZip.getAbsoluteFile())) {
loadSpec.put("type", LocalDataStorageDruidModule.SCHEME);
loadSpec.put("path", indexZip.getAbsolutePath());
if (updateDescriptor) {
log.info(
"Updating loadSpec in descriptor.json at [%s] with new path [%s]",
file.getAbsolutePath(),
indexZip.toString()
);
FileUtils.writeStringToFile(file, mapper.writeValueAsString(dataSegment));
}
}
segments.add(dataSegment);
}
catch (IOException e) {
throw new SegmentLoadingException(
e,
"Failed to read descriptor.json for segment located at [%s]",
file.getAbsoluteFile()
);
}
} else {
throw new SegmentLoadingException(
"index.zip didn't exist at [%s] while descripter.json exists!?",
indexZip.getAbsoluteFile()
);
}
}
}
}
}

View File

@ -0,0 +1,252 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NumberedShardSpec;
import org.apache.commons.io.FileUtils;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
/**
*/
public class LocalDataSegmentFinderTest
{
private static final ObjectMapper mapper = new DefaultObjectMapper();
private static final String DESCRIPTOR_JSON = "descriptor.json";
private static final String INDEX_ZIP = "index.zip";
private static final DataSegment SEGMENT_1 = DataSegment.builder()
.dataSource("wikipedia")
.interval(
new Interval(
"2013-08-31T00:00:00.000Z/2013-09-01T00:00:00.000Z"
)
)
.version("2015-10-21T22:07:57.074Z")
.loadSpec(
ImmutableMap.<String, Object>of(
"type",
"local",
"path",
"/tmp/somewhere/index.zip"
)
)
.dimensions(ImmutableList.of("language", "page"))
.metrics(ImmutableList.of("count"))
.build();
private static final DataSegment SEGMENT_2 = DataSegment.builder(SEGMENT_1)
.interval(
new Interval(
"2013-09-01T00:00:00.000Z/2013-09-02T00:00:00.000Z"
)
)
.build();
private static final DataSegment SEGMENT_3 = DataSegment.builder(SEGMENT_1)
.interval(
new Interval(
"2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"
)
)
.version("2015-10-22T22:07:57.074Z")
.build();
private static final DataSegment SEGMENT_4_0 = DataSegment.builder(SEGMENT_1)
.interval(
new Interval(
"2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"
)
)
.shardSpec(new NumberedShardSpec(0, 2))
.build();
private static final DataSegment SEGMENT_4_1 = DataSegment.builder(SEGMENT_1)
.interval(
new Interval(
"2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"
)
)
.shardSpec(new NumberedShardSpec(1, 2))
.build();
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
private File dataSourceDir;
private File descriptor1;
private File descriptor2;
private File descriptor3;
private File descriptor4_0;
private File descriptor4_1;
private File indexZip1;
private File indexZip2;
private File indexZip3;
private File indexZip4_0;
private File indexZip4_1;
@BeforeClass
public static void setUpStatic()
{
mapper.registerSubtypes(new NamedType(NumberedShardSpec.class, "numbered"));
}
@Before
public void setUp() throws Exception
{
dataSourceDir = temporaryFolder.newFolder();
descriptor1 = new File(dataSourceDir.getAbsolutePath() + "/interval1/v1/0", DESCRIPTOR_JSON);
descriptor2 = new File(dataSourceDir.getAbsolutePath() + "/interval2/v1/0", DESCRIPTOR_JSON);
descriptor3 = new File(dataSourceDir.getAbsolutePath() + "/interval3/v2/0", DESCRIPTOR_JSON);
descriptor4_0 = new File(dataSourceDir.getAbsolutePath() + "/interval4/v1/0", DESCRIPTOR_JSON);
descriptor4_1 = new File(dataSourceDir.getAbsolutePath() + "/interval4/v1/1", DESCRIPTOR_JSON);
descriptor1.getParentFile().mkdirs();
descriptor2.getParentFile().mkdirs();
descriptor3.getParentFile().mkdirs();
descriptor4_0.getParentFile().mkdirs();
descriptor4_1.getParentFile().mkdirs();
mapper.writeValue(descriptor1, SEGMENT_1);
mapper.writeValue(descriptor2, SEGMENT_2);
mapper.writeValue(descriptor3, SEGMENT_3);
mapper.writeValue(descriptor4_0, SEGMENT_4_0);
mapper.writeValue(descriptor4_1, SEGMENT_4_1);
indexZip1 = new File(descriptor1.getParentFile(), INDEX_ZIP);
indexZip2 = new File(descriptor2.getParentFile(), INDEX_ZIP);
indexZip3 = new File(descriptor3.getParentFile(), INDEX_ZIP);
indexZip4_0 = new File(descriptor4_0.getParentFile(), INDEX_ZIP);
indexZip4_1 = new File(descriptor4_1.getParentFile(), INDEX_ZIP);
indexZip1.createNewFile();
indexZip2.createNewFile();
indexZip3.createNewFile();
indexZip4_0.createNewFile();
indexZip4_1.createNewFile();
}
@Test
public void testFindSegments() throws SegmentLoadingException, IOException
{
final LocalDataSegmentFinder localDataSegmentFinder = new LocalDataSegmentFinder(mapper);
final Set<DataSegment> segments = localDataSegmentFinder.findSegments(dataSourceDir.getAbsolutePath(), false);
Assert.assertEquals(5, segments.size());
DataSegment updatedSegment1 = null;
DataSegment updatedSegment2 = null;
DataSegment updatedSegment3 = null;
DataSegment updatedSegment4_0 = null;
DataSegment updatedSegment4_1 = null;
for (DataSegment dataSegment : segments) {
if (dataSegment.getIdentifier().equals(SEGMENT_1.getIdentifier())) {
updatedSegment1 = dataSegment;
} else if (dataSegment.getIdentifier().equals(SEGMENT_2.getIdentifier())) {
updatedSegment2 = dataSegment;
} else if (dataSegment.getIdentifier().equals(SEGMENT_3.getIdentifier())) {
updatedSegment3 = dataSegment;
} else if (dataSegment.getIdentifier().equals(SEGMENT_4_0.getIdentifier())) {
updatedSegment4_0 = dataSegment;
} else if (dataSegment.getIdentifier().equals(SEGMENT_4_1.getIdentifier())) {
updatedSegment4_1 = dataSegment;
} else {
Assert.fail("Unexpected segment");
}
}
Assert.assertEquals(descriptor1.getAbsolutePath(), getDescriptorPath(updatedSegment1));
Assert.assertEquals(descriptor2.getAbsolutePath(), getDescriptorPath(updatedSegment2));
Assert.assertEquals(descriptor3.getAbsolutePath(), getDescriptorPath(updatedSegment3));
Assert.assertEquals(descriptor4_0.getAbsolutePath(), getDescriptorPath(updatedSegment4_0));
Assert.assertEquals(descriptor4_1.getAbsolutePath(), getDescriptorPath(updatedSegment4_1));
final String serializedSegment1 = mapper.writeValueAsString(updatedSegment1);
final String serializedSegment2 = mapper.writeValueAsString(updatedSegment2);
final String serializedSegment3 = mapper.writeValueAsString(updatedSegment3);
final String serializedSegment4_0 = mapper.writeValueAsString(updatedSegment4_0);
final String serializedSegment4_1 = mapper.writeValueAsString(updatedSegment4_1);
// since updateDescriptor was not enabled, descriptor.json still has stale information
Assert.assertNotEquals(serializedSegment1, FileUtils.readFileToString(descriptor1));
Assert.assertNotEquals(serializedSegment2, FileUtils.readFileToString(descriptor2));
Assert.assertNotEquals(serializedSegment3, FileUtils.readFileToString(descriptor3));
Assert.assertNotEquals(serializedSegment4_0, FileUtils.readFileToString(descriptor4_0));
Assert.assertNotEquals(serializedSegment4_1, FileUtils.readFileToString(descriptor4_1));
// enable updateDescriptor so that descriptors.json will be updated to relfect the new loadSpec
final Set<DataSegment> segments2 = localDataSegmentFinder.findSegments(dataSourceDir.getAbsolutePath(), true);
Assert.assertEquals(segments, segments2);
Assert.assertEquals(serializedSegment1, FileUtils.readFileToString(descriptor1));
Assert.assertEquals(serializedSegment2, FileUtils.readFileToString(descriptor2));
Assert.assertEquals(serializedSegment3, FileUtils.readFileToString(descriptor3));
Assert.assertEquals(serializedSegment4_0, FileUtils.readFileToString(descriptor4_0));
Assert.assertEquals(serializedSegment4_1, FileUtils.readFileToString(descriptor4_1));
}
private String getDescriptorPath(DataSegment segment)
{
final File indexzip = new File(String.valueOf(segment.getLoadSpec().get("path")));
return indexzip.getParent() + "/" + DESCRIPTOR_JSON;
}
@Test(expected = SegmentLoadingException.class)
public void testFindSegmentsFail() throws SegmentLoadingException
{
// remove one of index.zip while keeping its descriptor.json
indexZip4_1.delete();
final LocalDataSegmentFinder localDataSegmentFinder = new LocalDataSegmentFinder(mapper);
localDataSegmentFinder.findSegments(dataSourceDir.getAbsolutePath(), false);
}
@Test(expected = SegmentLoadingException.class)
public void testFindSegmentsFail2() throws SegmentLoadingException
{
// will fail to desierialize descriptor.json because DefaultObjectMapper doesn't recognize NumberedShardSpec
final LocalDataSegmentFinder localDataSegmentFinder = new LocalDataSegmentFinder(new DefaultObjectMapper());
try {
localDataSegmentFinder.findSegments(dataSourceDir.getAbsolutePath(), false);
}
catch (SegmentLoadingException e) {
Assert.assertTrue(e.getCause() instanceof IOException);
throw e;
}
}
}

View File

@ -0,0 +1,138 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.cli;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.metamx.common.logger.Logger;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Self;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.segment.loading.DataSegmentFinder;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.server.DruidNode;
import io.druid.timeline.DataSegment;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
*/
@Command(
name = "insert-segment-to-db",
description = "insert a segment into metadata storage"
)
public class InsertSegment extends GuiceRunnable
{
private static final Logger log = new Logger(InsertSegment.class);
@Option(name = "--workingDir", description = "The directory path where segments are stored. This tool will recursively look for segments underneath this directory and insert/update these segments in metdata storage.", required = true)
private String workingDirPath;
@Option(name = "--updateDescriptor", description = "if set to true, this tool will update loadSpec field in descriptor.json if the path in loadSpec is different from where desciptor.json was found. Default value is true", required = false)
private boolean updateDescriptor = true;
private ObjectMapper mapper;
private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
public InsertSegment()
{
super(log);
}
@Override
protected List<? extends Module> getModules()
{
return ImmutableList.<Module>of(
new Module()
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bindInstance(
binder, Key.get(DruidNode.class, Self.class), new DruidNode("tools", "localhost", -1)
);
}
}
);
}
@Override
public void run()
{
final Injector injector = makeInjector();
mapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class));
indexerMetadataStorageCoordinator = injector.getInstance(IndexerMetadataStorageCoordinator.class);
final DataSegmentFinder dataSegmentFinder = injector.getInstance(DataSegmentFinder.class);
log.info("Start seraching segments under [%s]", workingDirPath);
Set<DataSegment> segments = null;
try {
segments = dataSegmentFinder.findSegments(workingDirPath, updateDescriptor);
}
catch (SegmentLoadingException e) {
Throwables.propagate(e);
}
log.info(
"Done searching segments under [%s], [%d] segments were found",
workingDirPath,
segments.size()
);
try {
insertSegments(segments);
}
catch (IOException e) {
Throwables.propagate(e);
}
log.info("Done processing [%d] segments", segments.size());
}
private void insertSegments(final Set<DataSegment> segments) throws IOException
{
final Set<DataSegment> segmentsInserted = indexerMetadataStorageCoordinator.announceHistoricalSegments(segments);
for (DataSegment dataSegment : segmentsInserted) {
log.info("Sucessfully inserted Segment [%s] into metadata storage", dataSegment.getIdentifier());
}
final Set<DataSegment> segmentsAlreadyExist = Sets.difference(segments, segmentsInserted);
if (!segmentsAlreadyExist.isEmpty()) {
for (DataSegment dataSegment : segmentsAlreadyExist) {
log.info("Segment [%s] already exists in metadata storage, updating the payload", dataSegment.getIdentifier());
}
indexerMetadataStorageCoordinator.updateSegmentMetadata(segmentsAlreadyExist);
}
}
}

View File

@ -69,7 +69,13 @@ public class Main
builder.withGroup("tools") builder.withGroup("tools")
.withDescription("Various tools for working with Druid") .withDescription("Various tools for working with Druid")
.withDefaultCommand(Help.class) .withDefaultCommand(Help.class)
.withCommands(ConvertProperties.class, DruidJsonValidator.class, PullDependencies.class, CreateTables.class); .withCommands(
ConvertProperties.class,
DruidJsonValidator.class,
PullDependencies.class,
CreateTables.class,
InsertSegment.class
);
builder.withGroup("index") builder.withGroup("index")
.withDescription("Run indexing for druid") .withDescription("Run indexing for druid")