diff --git a/docs/content/development/libraries.md b/docs/content/development/libraries.md index 6bee326ea9c..22c57f0fb91 100644 --- a/docs/content/development/libraries.md +++ b/docs/content/development/libraries.md @@ -57,3 +57,8 @@ UIs * [mistercrunch/panoramix](https://github.com/mistercrunch/panoramix) - A web application to slice, dice and visualize data out of Druid * [grafana](https://github.com/Quantiply/grafana-plugins/tree/master/features/druid) - A plugin for [Grafana](http://grafana.org/) * [Pivot](https://github.com/implydata/pivot) - An exploratory analytics UI for Druid + +Tools +--- + +* [Insert Segments](../../operations/insert-segment-to-db.html) - A tool that can insert segments' metadata into Druid metadata storage. \ No newline at end of file diff --git a/docs/content/operations/insert-segment-to-db.md b/docs/content/operations/insert-segment-to-db.md new file mode 100644 index 00000000000..77ba46cf3d9 --- /dev/null +++ b/docs/content/operations/insert-segment-to-db.md @@ -0,0 +1,95 @@ +--- +layout: doc_page +--- +# insert-segment-to-db Tool + +`insert-segment-to-db` is a tool that can insert segments into Druid metadata storage. It is intended to be used +to update the segment table in metadata storage after people manually migrate segments from one place to another. +It can also be used to insert missing segment into Druid, or even recover metadata storage by telling it where the +segments are stored. + +Note: This tool expects users to have Druid cluster running in a "safe" mode, where there are no active tasks to interfere +the segments being inserted. Users can optionally bring down the cluster to make 100% sure nothing is interfering. + +In order to make it work, user will have to provide metadata storage credentials and deep storage type through Java JVM argument +or runtime.properties file. Specifically, this tool needs to know + +`druid.metadata.storage.type` + +`druid.metadata.storage.connector.connectURI` + +`druid.metadata.storage.connector.user` + +`druid.metadata.storage.connector.password` + +`druid.storage.type` + +Besides the properties above, you also need to specify the location where the segments are stored and whether you want to +update descriptor.json. These two can be provided through command line arguments. + +`--workingDir` (Required) + + The directory URI where segments are stored. This tool will recursively look for segments underneath this directory + and insert/update these segments in metdata storage. + Attention: workingDir must be a complete URI, which means it must be prefixed with scheme type. For example, + hdfs://hostname:port/segment_directory + +`--updateDescriptor` (Optional) + + if set to true, this tool will update `loadSpec` field in `descriptor.json` if the path in `loadSpec` is different from + where `desciptor.json` was found. Default value is `true`. + +Note: you will also need to load different Druid extensions per the metadata and deep storage you use. For example, if you +use `mysql` as metadata storage and `HDFS` as deep storage, you should load `mysql-metadata-storage` and `druid-hdfs-storage` +extensions. + + +Example: + +Suppose your metadata storage is `mysql` and you've migrated some segments to a directory in HDFS, and that directory looks +like this, + +``` +Directory path: /druid/storage/wikipedia + +├── 2013-08-31T000000.000Z_2013-09-01T000000.000Z +│   └── 2015-10-21T22_07_57.074Z +│   └── 0 +│   ├── descriptor.json +│   └── index.zip +├── 2013-09-01T000000.000Z_2013-09-02T000000.000Z +│   └── 2015-10-21T22_07_57.074Z +│   └── 0 +│   ├── descriptor.json +│   └── index.zip +├── 2013-09-02T000000.000Z_2013-09-03T000000.000Z +│   └── 2015-10-21T22_07_57.074Z +│   └── 0 +│   ├── descriptor.json +│   └── index.zip +└── 2013-09-03T000000.000Z_2013-09-04T000000.000Z + └── 2015-10-21T22_07_57.074Z + └── 0 + ├── descriptor.json + └── index.zip +``` + +To load all these segments into `mysql`, you can fire the command below, + +``` +java +-Ddruid.metadata.storage.type=mysql +-Ddruid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid +-Ddruid.metadata.storage.connector.user=druid +-Ddruid.metadata.storage.connector.password=diurd +-Ddruid.extensions.loadList=[\"mysql-metadata-storage\",\"druid-hdfs-storage\"] +-Ddruid.storage.type=hdfs +-cp $DRUID_CLASSPATH +io.druid.cli.Main tools insert-segment --workingDir hdfs://host:port//druid/storage/wikipedia --updateDescriptor true +``` + +In this example, `mysql` and deep storage type are provided through Java JVM arguments, you can optionally put all +of them in a runtime.properites file and include it in the Druid classpath. Note that we also include `mysql-metadata-storage` +and `druid-hdfs-storage` in the extension list. + +After running this command, the segments table in `mysql` should store the new location for each segment we just inserted. diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentFinder.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentFinder.java new file mode 100644 index 00000000000..19f098c8932 --- /dev/null +++ b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentFinder.java @@ -0,0 +1,116 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.storage.hdfs; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; +import com.google.inject.Inject; +import com.metamx.common.logger.Logger; +import io.druid.segment.loading.DataSegmentFinder; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +/** + */ +public class HdfsDataSegmentFinder implements DataSegmentFinder +{ + + private static final Logger log = new Logger(HdfsDataSegmentFinder.class); + + private final Configuration config; + private final ObjectMapper mapper; + + @Inject + public HdfsDataSegmentFinder(Configuration config, ObjectMapper mapper) + { + this.config = config; + this.mapper = mapper; + } + + @Override + public Set findSegments(String workingDirPathStr, boolean updateDescriptor) + throws SegmentLoadingException + { + final Set segments = Sets.newHashSet(); + final Path workingDirPath = new Path(workingDirPathStr); + FileSystem fs; + try { + fs = workingDirPath.getFileSystem(config); + + log.info(fs.getScheme()); + log.info("FileSystem URI:" + fs.getUri().toString()); + + if (!fs.exists(workingDirPath)) { + throw new SegmentLoadingException("Working directory [%s] doesn't exist.", workingDirPath); + } + + if (!fs.isDirectory(workingDirPath)) { + throw new SegmentLoadingException("Working directory [%s] is not a directory!?", workingDirPath); + } + + final RemoteIterator it = fs.listFiles(workingDirPath, true); + while (it.hasNext()) { + final LocatedFileStatus locatedFileStatus = it.next(); + final Path path = locatedFileStatus.getPath(); + if (path.getName().equals("descriptor.json")) { + final Path indexZip = new Path(path.getParent(), "index.zip"); + if (fs.exists(indexZip)) { + final DataSegment dataSegment = mapper.readValue(fs.open(path), DataSegment.class); + log.info("Found segment [%s] located at [%s]", dataSegment.getIdentifier(), indexZip); + + final Map loadSpec = dataSegment.getLoadSpec(); + final String pathWithoutScheme = indexZip.toUri().getPath(); + + if (!loadSpec.get("type").equals(HdfsStorageDruidModule.SCHEME) || !loadSpec.get("path") + .equals(pathWithoutScheme)) { + loadSpec.put("type", HdfsStorageDruidModule.SCHEME); + loadSpec.put("path", pathWithoutScheme); + if (updateDescriptor) { + log.info("Updating loadSpec in descriptor.json at [%s] with new path [%s]", path, pathWithoutScheme); + mapper.writeValue(fs.create(path, true), dataSegment); + } + } + segments.add(dataSegment); + } else { + throw new SegmentLoadingException( + "index.zip didn't exist at [%s] while descripter.json exists!?", + indexZip + ); + } + } + } + } + catch (IOException e) { + throw new SegmentLoadingException(e, "Problems interacting with filesystem[%s].", workingDirPath); + } + + return segments; + } + +} diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsLoadSpec.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsLoadSpec.java index a0d9c39a64c..2d93fb9b563 100644 --- a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsLoadSpec.java +++ b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsLoadSpec.java @@ -38,17 +38,21 @@ public class HdfsLoadSpec implements LoadSpec { private final Path path; final HdfsDataSegmentPuller puller; + @JsonCreator public HdfsLoadSpec( @JacksonInject HdfsDataSegmentPuller puller, @JsonProperty(value = "path", required = true) String path - ){ + ) + { Preconditions.checkNotNull(path); this.path = new Path(path); this.puller = puller; } + @JsonProperty("path") - public final String getPathString(){ + public final String getPathString() + { return path.toString(); } diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java index c7b45926481..525c293402c 100644 --- a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java +++ b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java @@ -91,6 +91,7 @@ public class HdfsStorageDruidModule implements DruidModule Binders.dataSegmentPullerBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentPuller.class).in(LazySingleton.class); Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentPusher.class).in(LazySingleton.class); Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentKiller.class).in(LazySingleton.class); + Binders.dataSegmentFinderBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentFinder.class).in(LazySingleton.class); final Configuration conf = new Configuration(); @@ -103,9 +104,11 @@ public class HdfsStorageDruidModule implements DruidModule try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); FileSystem.get(conf); - } catch(IOException ex) { + } + catch (IOException ex) { throw Throwables.propagate(ex); - } finally { + } + finally { Thread.currentThread().setContextClassLoader(currCtxCl); } diff --git a/extensions/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentFinderTest.java b/extensions/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentFinderTest.java new file mode 100644 index 00000000000..b765c9dc3b4 --- /dev/null +++ b/extensions/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentFinderTest.java @@ -0,0 +1,287 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.loading; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.storage.hdfs.HdfsDataSegmentFinder; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NumberedShardSpec; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.joda.time.Interval; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.Map; +import java.util.Set; + +/** + */ +public class HdfsDataSegmentFinderTest +{ + + private static final ObjectMapper mapper = new DefaultObjectMapper(); + private static final String DESCRIPTOR_JSON = "descriptor.json"; + private static final String INDEX_ZIP = "index.zip"; + private static final DataSegment SEGMENT_1 = DataSegment.builder() + .dataSource("wikipedia") + .interval( + new Interval( + "2013-08-31T00:00:00.000Z/2013-09-01T00:00:00.000Z" + ) + ) + .version("2015-10-21T22:07:57.074Z") + .loadSpec( + ImmutableMap.of( + "type", + "hdfs", + "path", + "hdfs://abc.com:1234/somewhere/index.zip" + ) + ) + .dimensions(ImmutableList.of("language", "page")) + .metrics(ImmutableList.of("count")) + .build(); + + private static final DataSegment SEGMENT_2 = DataSegment.builder(SEGMENT_1) + .interval( + new Interval( + "2013-09-01T00:00:00.000Z/2013-09-02T00:00:00.000Z" + ) + ) + .build(); + + private static final DataSegment SEGMENT_3 = DataSegment.builder(SEGMENT_1) + .interval( + new Interval( + "2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z" + ) + ) + .version("2015-10-22T22:07:57.074Z") + .build(); + + private static final DataSegment SEGMENT_4_0 = DataSegment.builder(SEGMENT_1) + .interval( + new Interval( + "2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z" + ) + ) + .shardSpec(new NumberedShardSpec(0, 2)) + .build(); + + private static final DataSegment SEGMENT_4_1 = DataSegment.builder(SEGMENT_1) + .interval( + new Interval( + "2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z" + ) + ) + .shardSpec(new NumberedShardSpec(1, 2)) + .build(); + + private static MiniDFSCluster miniCluster; + private static File hdfsTmpDir; + private static URI uriBase; + private static Configuration conf; + private static FileSystem fs; + + private Path dataSourceDir; + private Path descriptor1; + private Path descriptor2; + private Path descriptor3; + private Path descriptor4_0; + private Path descriptor4_1; + private Path indexZip1; + private Path indexZip2; + private Path indexZip3; + private Path indexZip4_0; + private Path indexZip4_1; + + @BeforeClass + public static void setupStatic() throws IOException + { + mapper.registerSubtypes(new NamedType(NumberedShardSpec.class, "numbered")); + + hdfsTmpDir = File.createTempFile("hdfsDataSource", "dir"); + hdfsTmpDir.deleteOnExit(); + if (!hdfsTmpDir.delete()) { + throw new IOException(String.format("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath())); + } + conf = new Configuration(true); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsTmpDir.getAbsolutePath()); + miniCluster = new MiniDFSCluster.Builder(conf).build(); + uriBase = miniCluster.getURI(); + fs = miniCluster.getFileSystem(); + } + + @AfterClass + public static void tearDownStatic() + { + if (miniCluster != null) { + miniCluster.shutdown(true); + } + } + + @Before + public void setUp() throws IOException + { + dataSourceDir = new Path(new Path(uriBase), "/usr/dataSource"); + descriptor1 = new Path(dataSourceDir, "interval1/v1/0/" + DESCRIPTOR_JSON); + descriptor2 = new Path(dataSourceDir, "interval2/v1/0/" + DESCRIPTOR_JSON); + descriptor3 = new Path(dataSourceDir, "interval3/v2/0/" + DESCRIPTOR_JSON); + descriptor4_0 = new Path(dataSourceDir, "interval4/v1/0/" + DESCRIPTOR_JSON); + descriptor4_1 = new Path(dataSourceDir, "interval4/v1/1/" + DESCRIPTOR_JSON); + indexZip1 = new Path(descriptor1.getParent(), INDEX_ZIP); + indexZip2 = new Path(descriptor2.getParent(), INDEX_ZIP); + indexZip3 = new Path(descriptor3.getParent(), INDEX_ZIP); + indexZip4_0 = new Path(descriptor4_0.getParent(), INDEX_ZIP); + indexZip4_1 = new Path(descriptor4_1.getParent(), INDEX_ZIP); + + mapper.writeValue(fs.create(descriptor1), SEGMENT_1); + mapper.writeValue(fs.create(descriptor2), SEGMENT_2); + mapper.writeValue(fs.create(descriptor3), SEGMENT_3); + mapper.writeValue(fs.create(descriptor4_0), SEGMENT_4_0); + mapper.writeValue(fs.create(descriptor4_1), SEGMENT_4_1); + + create(indexZip1); + create(indexZip2); + create(indexZip3); + create(indexZip4_0); + create(indexZip4_1); + } + + private void create(Path indexZip1) throws IOException + { + try (FSDataOutputStream os = fs.create(indexZip1)) { + } + } + + @Test + public void testFindSegments() throws Exception + { + final HdfsDataSegmentFinder hdfsDataSegmentFinder = new HdfsDataSegmentFinder(conf, mapper); + + final Set segments = hdfsDataSegmentFinder.findSegments(dataSourceDir.toString(), false); + + Assert.assertEquals(5, segments.size()); + + DataSegment updatedSegment1 = null; + DataSegment updatedSegment2 = null; + DataSegment updatedSegment3 = null; + DataSegment updatedSegment4_0 = null; + DataSegment updatedSegment4_1 = null; + for (DataSegment dataSegment : segments) { + if (dataSegment.getIdentifier().equals(SEGMENT_1.getIdentifier())) { + updatedSegment1 = dataSegment; + } else if (dataSegment.getIdentifier().equals(SEGMENT_2.getIdentifier())) { + updatedSegment2 = dataSegment; + } else if (dataSegment.getIdentifier().equals(SEGMENT_3.getIdentifier())) { + updatedSegment3 = dataSegment; + } else if (dataSegment.getIdentifier().equals(SEGMENT_4_0.getIdentifier())) { + updatedSegment4_0 = dataSegment; + } else if (dataSegment.getIdentifier().equals(SEGMENT_4_1.getIdentifier())) { + updatedSegment4_1 = dataSegment; + } else { + Assert.fail("Unexpected segment"); + } + } + + Assert.assertEquals(descriptor1.toUri().getPath(), getDescriptorPath(updatedSegment1)); + Assert.assertEquals(descriptor2.toUri().getPath(), getDescriptorPath(updatedSegment2)); + Assert.assertEquals(descriptor3.toUri().getPath(), getDescriptorPath(updatedSegment3)); + Assert.assertEquals(descriptor4_0.toUri().getPath(), getDescriptorPath(updatedSegment4_0)); + Assert.assertEquals(descriptor4_1.toUri().getPath(), getDescriptorPath(updatedSegment4_1)); + + final String serializedSegment1 = mapper.writeValueAsString(updatedSegment1); + final String serializedSegment2 = mapper.writeValueAsString(updatedSegment2); + final String serializedSegment3 = mapper.writeValueAsString(updatedSegment3); + final String serializedSegment4_0 = mapper.writeValueAsString(updatedSegment4_0); + final String serializedSegment4_1 = mapper.writeValueAsString(updatedSegment4_1); + + // since updateDescriptor was not enabled, descriptor.json still has stale information + Assert.assertNotEquals(serializedSegment1, readContent(descriptor1)); + Assert.assertNotEquals(serializedSegment2, readContent(descriptor2)); + Assert.assertNotEquals(serializedSegment3, readContent(descriptor3)); + Assert.assertNotEquals(serializedSegment4_0, readContent(descriptor4_0)); + Assert.assertNotEquals(serializedSegment4_1, readContent(descriptor4_1)); + + // enable updateDescriptor so that descriptors.json will be updated to relfect the new loadSpec + final Set segments2 = hdfsDataSegmentFinder.findSegments(dataSourceDir.toString(), true); + + Assert.assertEquals(segments, segments2); + Assert.assertEquals(serializedSegment1, readContent(descriptor1)); + Assert.assertEquals(serializedSegment2, readContent(descriptor2)); + Assert.assertEquals(serializedSegment3, readContent(descriptor3)); + Assert.assertEquals(serializedSegment4_0, readContent(descriptor4_0)); + Assert.assertEquals(serializedSegment4_1, readContent(descriptor4_1)); + } + + @Test(expected = SegmentLoadingException.class) + public void testFindSegmentsFail() throws Exception + { + // remove one of index.zip while keeping its descriptor.json + fs.delete(indexZip4_1, false); + + final HdfsDataSegmentFinder hdfsDataSegmentFinder = new HdfsDataSegmentFinder(conf, mapper); + hdfsDataSegmentFinder.findSegments(dataSourceDir.toString(), false); + } + + @Test(expected = SegmentLoadingException.class) + public void testFindSegmentsFail2() throws SegmentLoadingException + { + // will fail to desierialize descriptor.json because DefaultObjectMapper doesn't recognize NumberedShardSpec + final HdfsDataSegmentFinder hdfsDataSegmentFinder = new HdfsDataSegmentFinder(conf, new DefaultObjectMapper()); + try { + hdfsDataSegmentFinder.findSegments(dataSourceDir.toString(), false); + } + catch (SegmentLoadingException e) { + Assert.assertTrue(e.getCause() instanceof IOException); + throw e; + } + } + + private String getDescriptorPath(DataSegment segment) + { + final Path indexzip = new Path(String.valueOf(segment.getLoadSpec().get("path"))); + return indexzip.getParent().toString() + "/" + DESCRIPTOR_JSON; + } + + private String readContent(Path descriptor) throws IOException + { + final FSDataInputStream is = fs.open(descriptor); + final String content = IOUtils.toString(is); + is.close(); + return content; + } +} diff --git a/server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java b/server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java index 29bf764be35..d8338bebf81 100644 --- a/server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java +++ b/server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java @@ -26,8 +26,10 @@ import com.google.inject.Key; import com.google.inject.multibindings.MapBinder; import io.druid.data.SearchableVersionedDataFinder; import io.druid.initialization.DruidModule; +import io.druid.segment.loading.DataSegmentFinder; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.loading.LocalDataSegmentFinder; import io.druid.segment.loading.LocalDataSegmentKiller; import io.druid.segment.loading.LocalDataSegmentPuller; import io.druid.segment.loading.LocalDataSegmentPusher; @@ -55,6 +57,8 @@ public class LocalDataStorageDruidModule implements DruidModule PolyBind.createChoice( binder, "druid.storage.type", Key.get(DataSegmentPusher.class), Key.get(LocalDataSegmentPusher.class) ); + + PolyBind.createChoice(binder, "druid.storage.type", Key.get(DataSegmentFinder.class), null); } private static void bindDeepStorageLocal(Binder binder) @@ -79,6 +83,11 @@ public class LocalDataStorageDruidModule implements DruidModule .to(LocalDataSegmentPusher.class) .in(LazySingleton.class); + PolyBind.optionBinder(binder, Key.get(DataSegmentFinder.class)) + .addBinding(SCHEME) + .to(LocalDataSegmentFinder.class) + .in(LazySingleton.class); + JsonConfigProvider.bind(binder, "druid.storage", LocalDataSegmentPusherConfig.class); } diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentFinder.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentFinder.java new file mode 100644 index 00000000000..54e651ac43e --- /dev/null +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentFinder.java @@ -0,0 +1,108 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.loading; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.util.Sets; +import com.google.inject.Inject; +import com.metamx.common.logger.Logger; +import io.druid.guice.LocalDataStorageDruidModule; +import io.druid.timeline.DataSegment; +import org.apache.commons.io.FileUtils; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +/** + */ +public class LocalDataSegmentFinder implements DataSegmentFinder +{ + + private static final Logger log = new Logger(LocalDataSegmentFinder.class); + + private final ObjectMapper mapper; + + @Inject + public LocalDataSegmentFinder(ObjectMapper mapper) + { + this.mapper = mapper; + } + + @Override + public Set findSegments(String workingDirPath, boolean updateDescriptor) + throws SegmentLoadingException + { + + final Set segments = Sets.newHashSet(); + final File workingDir = new File(workingDirPath); + if (!workingDir.isDirectory()) { + throw new SegmentLoadingException("Working directory [%s] didn't exist !?", workingDir); + } + recursiveSearchSegments(segments, workingDir, updateDescriptor); + return segments; + } + + private void recursiveSearchSegments(Set segments, File workingDir, boolean updateDescriptor) + throws SegmentLoadingException + { + for (File file : workingDir.listFiles()) { + if (file.isDirectory()) { + recursiveSearchSegments(segments, file, updateDescriptor); + } else if (file.getName().equals("descriptor.json")) { + final File indexZip = new File(file.getParentFile(), "index.zip"); + if (indexZip.exists()) { + try { + final DataSegment dataSegment = mapper.readValue(FileUtils.readFileToString(file), DataSegment.class); + log.info("Found segment [%s] located at [%s]", dataSegment.getIdentifier(), indexZip.getAbsoluteFile()); + final Map loadSpec = dataSegment.getLoadSpec(); + if (!loadSpec.get("type").equals(LocalDataStorageDruidModule.SCHEME) || !loadSpec.get("path") + .equals(indexZip.getAbsoluteFile())) { + loadSpec.put("type", LocalDataStorageDruidModule.SCHEME); + loadSpec.put("path", indexZip.getAbsolutePath()); + if (updateDescriptor) { + log.info( + "Updating loadSpec in descriptor.json at [%s] with new path [%s]", + file.getAbsolutePath(), + indexZip.toString() + ); + FileUtils.writeStringToFile(file, mapper.writeValueAsString(dataSegment)); + } + } + segments.add(dataSegment); + } + catch (IOException e) { + throw new SegmentLoadingException( + e, + "Failed to read descriptor.json for segment located at [%s]", + file.getAbsoluteFile() + ); + } + } else { + throw new SegmentLoadingException( + "index.zip didn't exist at [%s] while descripter.json exists!?", + indexZip.getAbsoluteFile() + ); + } + } + } + } +} diff --git a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentFinderTest.java b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentFinderTest.java new file mode 100644 index 00000000000..b4b7c65b1a4 --- /dev/null +++ b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentFinderTest.java @@ -0,0 +1,252 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.loading; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NumberedShardSpec; +import org.apache.commons.io.FileUtils; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +/** + */ +public class LocalDataSegmentFinderTest +{ + + private static final ObjectMapper mapper = new DefaultObjectMapper(); + private static final String DESCRIPTOR_JSON = "descriptor.json"; + private static final String INDEX_ZIP = "index.zip"; + private static final DataSegment SEGMENT_1 = DataSegment.builder() + .dataSource("wikipedia") + .interval( + new Interval( + "2013-08-31T00:00:00.000Z/2013-09-01T00:00:00.000Z" + ) + ) + .version("2015-10-21T22:07:57.074Z") + .loadSpec( + ImmutableMap.of( + "type", + "local", + "path", + "/tmp/somewhere/index.zip" + ) + ) + .dimensions(ImmutableList.of("language", "page")) + .metrics(ImmutableList.of("count")) + .build(); + + private static final DataSegment SEGMENT_2 = DataSegment.builder(SEGMENT_1) + .interval( + new Interval( + "2013-09-01T00:00:00.000Z/2013-09-02T00:00:00.000Z" + ) + ) + .build(); + + private static final DataSegment SEGMENT_3 = DataSegment.builder(SEGMENT_1) + .interval( + new Interval( + "2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z" + ) + ) + .version("2015-10-22T22:07:57.074Z") + .build(); + + private static final DataSegment SEGMENT_4_0 = DataSegment.builder(SEGMENT_1) + .interval( + new Interval( + "2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z" + ) + ) + .shardSpec(new NumberedShardSpec(0, 2)) + .build(); + + private static final DataSegment SEGMENT_4_1 = DataSegment.builder(SEGMENT_1) + .interval( + new Interval( + "2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z" + ) + ) + .shardSpec(new NumberedShardSpec(1, 2)) + .build(); + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private File dataSourceDir; + private File descriptor1; + private File descriptor2; + private File descriptor3; + private File descriptor4_0; + private File descriptor4_1; + private File indexZip1; + private File indexZip2; + private File indexZip3; + private File indexZip4_0; + private File indexZip4_1; + + @BeforeClass + public static void setUpStatic() + { + mapper.registerSubtypes(new NamedType(NumberedShardSpec.class, "numbered")); + } + + @Before + public void setUp() throws Exception + { + + dataSourceDir = temporaryFolder.newFolder(); + descriptor1 = new File(dataSourceDir.getAbsolutePath() + "/interval1/v1/0", DESCRIPTOR_JSON); + descriptor2 = new File(dataSourceDir.getAbsolutePath() + "/interval2/v1/0", DESCRIPTOR_JSON); + descriptor3 = new File(dataSourceDir.getAbsolutePath() + "/interval3/v2/0", DESCRIPTOR_JSON); + descriptor4_0 = new File(dataSourceDir.getAbsolutePath() + "/interval4/v1/0", DESCRIPTOR_JSON); + descriptor4_1 = new File(dataSourceDir.getAbsolutePath() + "/interval4/v1/1", DESCRIPTOR_JSON); + + descriptor1.getParentFile().mkdirs(); + descriptor2.getParentFile().mkdirs(); + descriptor3.getParentFile().mkdirs(); + descriptor4_0.getParentFile().mkdirs(); + descriptor4_1.getParentFile().mkdirs(); + + mapper.writeValue(descriptor1, SEGMENT_1); + mapper.writeValue(descriptor2, SEGMENT_2); + mapper.writeValue(descriptor3, SEGMENT_3); + mapper.writeValue(descriptor4_0, SEGMENT_4_0); + mapper.writeValue(descriptor4_1, SEGMENT_4_1); + + indexZip1 = new File(descriptor1.getParentFile(), INDEX_ZIP); + indexZip2 = new File(descriptor2.getParentFile(), INDEX_ZIP); + indexZip3 = new File(descriptor3.getParentFile(), INDEX_ZIP); + indexZip4_0 = new File(descriptor4_0.getParentFile(), INDEX_ZIP); + indexZip4_1 = new File(descriptor4_1.getParentFile(), INDEX_ZIP); + + indexZip1.createNewFile(); + indexZip2.createNewFile(); + indexZip3.createNewFile(); + indexZip4_0.createNewFile(); + indexZip4_1.createNewFile(); + } + + @Test + public void testFindSegments() throws SegmentLoadingException, IOException + { + final LocalDataSegmentFinder localDataSegmentFinder = new LocalDataSegmentFinder(mapper); + + final Set segments = localDataSegmentFinder.findSegments(dataSourceDir.getAbsolutePath(), false); + + Assert.assertEquals(5, segments.size()); + + DataSegment updatedSegment1 = null; + DataSegment updatedSegment2 = null; + DataSegment updatedSegment3 = null; + DataSegment updatedSegment4_0 = null; + DataSegment updatedSegment4_1 = null; + for (DataSegment dataSegment : segments) { + if (dataSegment.getIdentifier().equals(SEGMENT_1.getIdentifier())) { + updatedSegment1 = dataSegment; + } else if (dataSegment.getIdentifier().equals(SEGMENT_2.getIdentifier())) { + updatedSegment2 = dataSegment; + } else if (dataSegment.getIdentifier().equals(SEGMENT_3.getIdentifier())) { + updatedSegment3 = dataSegment; + } else if (dataSegment.getIdentifier().equals(SEGMENT_4_0.getIdentifier())) { + updatedSegment4_0 = dataSegment; + } else if (dataSegment.getIdentifier().equals(SEGMENT_4_1.getIdentifier())) { + updatedSegment4_1 = dataSegment; + } else { + Assert.fail("Unexpected segment"); + } + } + + Assert.assertEquals(descriptor1.getAbsolutePath(), getDescriptorPath(updatedSegment1)); + Assert.assertEquals(descriptor2.getAbsolutePath(), getDescriptorPath(updatedSegment2)); + Assert.assertEquals(descriptor3.getAbsolutePath(), getDescriptorPath(updatedSegment3)); + Assert.assertEquals(descriptor4_0.getAbsolutePath(), getDescriptorPath(updatedSegment4_0)); + Assert.assertEquals(descriptor4_1.getAbsolutePath(), getDescriptorPath(updatedSegment4_1)); + + final String serializedSegment1 = mapper.writeValueAsString(updatedSegment1); + final String serializedSegment2 = mapper.writeValueAsString(updatedSegment2); + final String serializedSegment3 = mapper.writeValueAsString(updatedSegment3); + final String serializedSegment4_0 = mapper.writeValueAsString(updatedSegment4_0); + final String serializedSegment4_1 = mapper.writeValueAsString(updatedSegment4_1); + + // since updateDescriptor was not enabled, descriptor.json still has stale information + Assert.assertNotEquals(serializedSegment1, FileUtils.readFileToString(descriptor1)); + Assert.assertNotEquals(serializedSegment2, FileUtils.readFileToString(descriptor2)); + Assert.assertNotEquals(serializedSegment3, FileUtils.readFileToString(descriptor3)); + Assert.assertNotEquals(serializedSegment4_0, FileUtils.readFileToString(descriptor4_0)); + Assert.assertNotEquals(serializedSegment4_1, FileUtils.readFileToString(descriptor4_1)); + + // enable updateDescriptor so that descriptors.json will be updated to relfect the new loadSpec + final Set segments2 = localDataSegmentFinder.findSegments(dataSourceDir.getAbsolutePath(), true); + + Assert.assertEquals(segments, segments2); + Assert.assertEquals(serializedSegment1, FileUtils.readFileToString(descriptor1)); + Assert.assertEquals(serializedSegment2, FileUtils.readFileToString(descriptor2)); + Assert.assertEquals(serializedSegment3, FileUtils.readFileToString(descriptor3)); + Assert.assertEquals(serializedSegment4_0, FileUtils.readFileToString(descriptor4_0)); + Assert.assertEquals(serializedSegment4_1, FileUtils.readFileToString(descriptor4_1)); + } + + private String getDescriptorPath(DataSegment segment) + { + final File indexzip = new File(String.valueOf(segment.getLoadSpec().get("path"))); + return indexzip.getParent() + "/" + DESCRIPTOR_JSON; + } + + @Test(expected = SegmentLoadingException.class) + public void testFindSegmentsFail() throws SegmentLoadingException + { + // remove one of index.zip while keeping its descriptor.json + indexZip4_1.delete(); + + final LocalDataSegmentFinder localDataSegmentFinder = new LocalDataSegmentFinder(mapper); + localDataSegmentFinder.findSegments(dataSourceDir.getAbsolutePath(), false); + } + + @Test(expected = SegmentLoadingException.class) + public void testFindSegmentsFail2() throws SegmentLoadingException + { + // will fail to desierialize descriptor.json because DefaultObjectMapper doesn't recognize NumberedShardSpec + final LocalDataSegmentFinder localDataSegmentFinder = new LocalDataSegmentFinder(new DefaultObjectMapper()); + try { + localDataSegmentFinder.findSegments(dataSourceDir.getAbsolutePath(), false); + } + catch (SegmentLoadingException e) { + Assert.assertTrue(e.getCause() instanceof IOException); + throw e; + } + } +} diff --git a/services/src/main/java/io/druid/cli/InsertSegment.java b/services/src/main/java/io/druid/cli/InsertSegment.java new file mode 100644 index 00000000000..7f451bce8d9 --- /dev/null +++ b/services/src/main/java/io/druid/cli/InsertSegment.java @@ -0,0 +1,138 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.cli; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.metamx.common.logger.Logger; +import io.airlift.airline.Command; +import io.airlift.airline.Option; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.annotations.Json; +import io.druid.guice.annotations.Self; +import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.segment.loading.DataSegmentFinder; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.server.DruidNode; +import io.druid.timeline.DataSegment; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + */ +@Command( + name = "insert-segment-to-db", + description = "insert a segment into metadata storage" +) +public class InsertSegment extends GuiceRunnable +{ + private static final Logger log = new Logger(InsertSegment.class); + + @Option(name = "--workingDir", description = "The directory path where segments are stored. This tool will recursively look for segments underneath this directory and insert/update these segments in metdata storage.", required = true) + private String workingDirPath; + + @Option(name = "--updateDescriptor", description = "if set to true, this tool will update loadSpec field in descriptor.json if the path in loadSpec is different from where desciptor.json was found. Default value is true", required = false) + private boolean updateDescriptor = true; + + private ObjectMapper mapper; + private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; + + public InsertSegment() + { + super(log); + } + + @Override + protected List getModules() + { + return ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bindInstance( + binder, Key.get(DruidNode.class, Self.class), new DruidNode("tools", "localhost", -1) + ); + } + } + ); + } + + @Override + public void run() + { + final Injector injector = makeInjector(); + mapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class)); + indexerMetadataStorageCoordinator = injector.getInstance(IndexerMetadataStorageCoordinator.class); + final DataSegmentFinder dataSegmentFinder = injector.getInstance(DataSegmentFinder.class); + + log.info("Start seraching segments under [%s]", workingDirPath); + + Set segments = null; + try { + segments = dataSegmentFinder.findSegments(workingDirPath, updateDescriptor); + } + catch (SegmentLoadingException e) { + Throwables.propagate(e); + } + + log.info( + "Done searching segments under [%s], [%d] segments were found", + workingDirPath, + segments.size() + ); + + try { + insertSegments(segments); + } + catch (IOException e) { + Throwables.propagate(e); + } + + log.info("Done processing [%d] segments", segments.size()); + } + + private void insertSegments(final Set segments) throws IOException + { + final Set segmentsInserted = indexerMetadataStorageCoordinator.announceHistoricalSegments(segments); + for (DataSegment dataSegment : segmentsInserted) { + log.info("Sucessfully inserted Segment [%s] into metadata storage", dataSegment.getIdentifier()); + } + final Set segmentsAlreadyExist = Sets.difference(segments, segmentsInserted); + if (!segmentsAlreadyExist.isEmpty()) { + for (DataSegment dataSegment : segmentsAlreadyExist) { + log.info("Segment [%s] already exists in metadata storage, updating the payload", dataSegment.getIdentifier()); + } + indexerMetadataStorageCoordinator.updateSegmentMetadata(segmentsAlreadyExist); + } + } + + +} diff --git a/services/src/main/java/io/druid/cli/Main.java b/services/src/main/java/io/druid/cli/Main.java index 4193d2b4f68..ca5ec1c1126 100644 --- a/services/src/main/java/io/druid/cli/Main.java +++ b/services/src/main/java/io/druid/cli/Main.java @@ -69,7 +69,13 @@ public class Main builder.withGroup("tools") .withDescription("Various tools for working with Druid") .withDefaultCommand(Help.class) - .withCommands(ConvertProperties.class, DruidJsonValidator.class, PullDependencies.class, CreateTables.class); + .withCommands( + ConvertProperties.class, + DruidJsonValidator.class, + PullDependencies.class, + CreateTables.class, + InsertSegment.class + ); builder.withGroup("index") .withDescription("Run indexing for druid")