Merge branch 'task-stuff' into indexing_refactor

Conflicts:
	merger/src/main/java/com/metamx/druid/merger/worker/config/WorkerConfig.java
This commit is contained in:
Gian Merlino 2013-02-26 13:03:56 -08:00
commit d32a6284ce
81 changed files with 1356 additions and 877 deletions

View File

@ -18,8 +18,7 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-client</artifactId>
@ -29,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.2.8-SNAPSHOT</version>
<version>0.3.4-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -139,18 +139,24 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
// build set of segments to query
Set<Pair<ServerSelector, SegmentDescriptor>> segments = Sets.newLinkedHashSet();
List<TimelineObjectHolder<String, ServerSelector>> serversLookup = Lists.newLinkedList();
for (Interval interval : rewrittenQuery.getIntervals()) {
List<TimelineObjectHolder<String, ServerSelector>> serversLookup = timeline.lookup(interval);
serversLookup.addAll(timeline.lookup(interval));
}
for (TimelineObjectHolder<String, ServerSelector> holder : serversLookup) {
for (PartitionChunk<ServerSelector> chunk : holder.getObject()) {
ServerSelector selector = chunk.getObject();
final SegmentDescriptor descriptor = new SegmentDescriptor(
holder.getInterval(), holder.getVersion(), chunk.getChunkNumber()
);
// Let tool chest filter out unneeded segments
final List<TimelineObjectHolder<String, ServerSelector>> filteredServersLookup =
toolChest.filterSegments(query, serversLookup);
segments.add(Pair.of(selector, descriptor));
}
for (TimelineObjectHolder<String, ServerSelector> holder : filteredServersLookup) {
for (PartitionChunk<ServerSelector> chunk : holder.getObject()) {
ServerSelector selector = chunk.getObject();
final SegmentDescriptor descriptor = new SegmentDescriptor(
holder.getInterval(), holder.getVersion(), chunk.getChunkNumber()
);
segments.add(Pair.of(selector, descriptor));
}
}

View File

@ -22,18 +22,19 @@ package com.metamx.druid.query;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.metamx.common.guava.Sequence;
import com.metamx.druid.LogicalSegment;
import com.metamx.druid.Query;
import com.metamx.emitter.service.ServiceMetricEvent;
import java.util.List;
/**
* The broker-side (also used by server in some cases) API for a specific Query type. This API is still undergoing
* evolution and is only semi-stable, so proprietary Query implementations should be ready for the potential
* maintenance burden when upgrading versions.
*/
public interface QueryToolChest<ResultType, QueryType extends Query<ResultType>>
public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultType>>
{
public QueryRunner<ResultType> mergeResults(QueryRunner<ResultType> runner);
public abstract QueryRunner<ResultType> mergeResults(QueryRunner<ResultType> runner);
/**
* This method doesn't belong here, but it's here for now just to make it work.
@ -41,11 +42,24 @@ public interface QueryToolChest<ResultType, QueryType extends Query<ResultType>>
* @param seqOfSequences
* @return
*/
public Sequence<ResultType> mergeSequences(Sequence<Sequence<ResultType>> seqOfSequences);
public ServiceMetricEvent.Builder makeMetricBuilder(QueryType query);
public Function<ResultType, ResultType> makeMetricManipulatorFn(QueryType query, MetricManipulationFn fn);
public TypeReference<ResultType> getResultTypeReference();
public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query);
public QueryRunner<ResultType> preMergeQueryDecoration(QueryRunner<ResultType> runner);
public QueryRunner<ResultType> postMergeQueryDecoration(QueryRunner<ResultType> runner);
public abstract Sequence<ResultType> mergeSequences(Sequence<Sequence<ResultType>> seqOfSequences);
public abstract ServiceMetricEvent.Builder makeMetricBuilder(QueryType query);
public abstract Function<ResultType, ResultType> makeMetricManipulatorFn(QueryType query, MetricManipulationFn fn);
public abstract TypeReference<ResultType> getResultTypeReference();
public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query) {
return null;
}
public QueryRunner<ResultType> preMergeQueryDecoration(QueryRunner<ResultType> runner) {
return runner;
}
public QueryRunner<ResultType> postMergeQueryDecoration(QueryRunner<ResultType> runner) {
return runner;
}
public <T extends LogicalSegment> List<T> filterSegments(QueryType query, List<T> segments) {
return segments;
}
}

View File

@ -54,7 +54,7 @@ import java.util.Properties;
/**
*/
public class GroupByQueryQueryToolChest implements QueryToolChest<Row, GroupByQuery>
public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery>
{
private static final TypeReference<Row> TYPE_REFERENCE = new TypeReference<Row>(){};
@ -177,22 +177,4 @@ public class GroupByQueryQueryToolChest implements QueryToolChest<Row, GroupByQu
{
return TYPE_REFERENCE;
}
@Override
public CacheStrategy<Row, Object, GroupByQuery> getCacheStrategy(GroupByQuery query)
{
return null;
}
@Override
public QueryRunner<Row> preMergeQueryDecoration(QueryRunner<Row> runner)
{
return runner;
}
@Override
public QueryRunner<Row> postMergeQueryDecoration(QueryRunner<Row> runner)
{
return runner;
}
}

View File

@ -51,7 +51,7 @@ import java.util.Map;
import java.util.Set;
public class SegmentMetadataQueryQueryToolChest implements QueryToolChest<SegmentAnalysis, SegmentMetadataQuery>
public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAnalysis, SegmentMetadataQuery>
{
private static final TypeReference<SegmentAnalysis> TYPE_REFERENCE = new TypeReference<SegmentAnalysis>(){};
private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4};
@ -220,18 +220,6 @@ public class SegmentMetadataQueryQueryToolChest implements QueryToolChest<Segmen
};
}
@Override
public QueryRunner<SegmentAnalysis> preMergeQueryDecoration(QueryRunner<SegmentAnalysis> runner)
{
return runner;
}
@Override
public QueryRunner<SegmentAnalysis> postMergeQueryDecoration(QueryRunner<SegmentAnalysis> runner)
{
return runner;
}
private Ordering<SegmentAnalysis> getOrdering()
{
return new Ordering<SegmentAnalysis>()

View File

@ -65,7 +65,7 @@ import java.util.Set;
/**
*/
public class SearchQueryQueryToolChest implements QueryToolChest<Result<SearchResultValue>, SearchQuery>
public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResultValue>, SearchQuery>
{
private static final byte SEARCH_QUERY = 0x2;
@ -263,12 +263,6 @@ public class SearchQueryQueryToolChest implements QueryToolChest<Result<SearchRe
);
}
@Override
public QueryRunner<Result<SearchResultValue>> postMergeQueryDecoration(final QueryRunner<Result<SearchResultValue>> runner)
{
return runner;
}
private static class SearchThresholdAdjustingQueryRunner implements QueryRunner<Result<SearchResultValue>>
{
private final QueryRunner<Result<SearchResultValue>> runner;

View File

@ -27,6 +27,7 @@ import com.google.common.collect.Ordering;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.druid.LogicalSegment;
import com.metamx.druid.Query;
import com.metamx.druid.collect.OrderedMergeSequence;
import com.metamx.druid.query.BySegmentSkippingQueryRunner;
@ -37,7 +38,6 @@ import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.TimeBoundaryResultValue;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@ -47,7 +47,7 @@ import java.util.List;
/**
*/
public class TimeBoundaryQueryQueryToolChest
implements QueryToolChest<Result<TimeBoundaryResultValue>, TimeBoundaryQuery>
extends QueryToolChest<Result<TimeBoundaryResultValue>, TimeBoundaryQuery>
{
private static final byte TIMEBOUNDARY_QUERY = 0x3;
@ -58,6 +58,16 @@ public class TimeBoundaryQueryQueryToolChest
{
};
@Override
public <T extends LogicalSegment> List<T> filterSegments(TimeBoundaryQuery query, List<T> input)
{
if(input.size() <= 1) {
return input;
}
return Lists.newArrayList(input.get(0), input.get(input.size() - 1));
}
@Override
public QueryRunner<Result<TimeBoundaryResultValue>> mergeResults(
final QueryRunner<Result<TimeBoundaryResultValue>> runner
@ -169,18 +179,6 @@ public class TimeBoundaryQueryQueryToolChest
};
}
@Override
public QueryRunner<Result<TimeBoundaryResultValue>> preMergeQueryDecoration(QueryRunner<Result<TimeBoundaryResultValue>> runner)
{
return runner;
}
@Override
public QueryRunner<Result<TimeBoundaryResultValue>> postMergeQueryDecoration(QueryRunner<Result<TimeBoundaryResultValue>> runner)
{
return runner;
}
public Ordering<Result<TimeBoundaryResultValue>> getOrdering()
{
return Ordering.natural();

View File

@ -61,7 +61,7 @@ import java.util.Map;
/**
*/
public class TimeseriesQueryQueryToolChest implements QueryToolChest<Result<TimeseriesResultValue>, TimeseriesQuery>
public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<TimeseriesResultValue>, TimeseriesQuery>
{
private static final byte TIMESERIES_QUERY = 0x0;
@ -259,12 +259,6 @@ public class TimeseriesQueryQueryToolChest implements QueryToolChest<Result<Time
return new IntervalChunkingQueryRunner<Result<TimeseriesResultValue>>(runner, Period.months(1));
}
@Override
public QueryRunner<Result<TimeseriesResultValue>> postMergeQueryDecoration(QueryRunner<Result<TimeseriesResultValue>> runner)
{
return runner;
}
public Ordering<Result<TimeseriesResultValue>> getOrdering()
{
return Ordering.natural();

View File

@ -18,8 +18,7 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-common</artifactId>
@ -29,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.2.8-SNAPSHOT</version>
<version>0.3.4-SNAPSHOT</version>
</parent>
<dependencies>
@ -81,8 +80,12 @@
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>

View File

@ -0,0 +1,8 @@
package com.metamx.druid;
import org.joda.time.Interval;
public interface LogicalSegment
{
public Interval getInterval();
}

View File

@ -24,7 +24,7 @@ import org.joda.time.Interval;
/**
*/
public class TimelineObjectHolder<VersionType, ObjectType>
public class TimelineObjectHolder<VersionType, ObjectType> implements LogicalSegment
{
private final Interval interval;
private final VersionType version;
@ -41,6 +41,7 @@ public class TimelineObjectHolder<VersionType, ObjectType>
this.object = object;
}
@Override
public Interval getInterval()
{
return interval;

View File

@ -36,6 +36,7 @@ import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import com.fasterxml.jackson.datatype.guava.GuavaModule;
import com.fasterxml.jackson.datatype.joda.JodaModule;
import com.google.common.base.Throwables;
import com.metamx.common.Granularity;
@ -171,10 +172,11 @@ public class DefaultObjectMapper extends ObjectMapper
}
);
registerModule(serializerModule);
registerModule(new GuavaModule());
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
configure(MapperFeature.AUTO_DETECT_GETTERS, false);
configure(MapperFeature.AUTO_DETECT_CREATORS, false);
// configure(MapperFeature.AUTO_DETECT_CREATORS, false); https://github.com/FasterXML/jackson-databind/issues/170
configure(MapperFeature.AUTO_DETECT_FIELDS, false);
configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false);
configure(MapperFeature.AUTO_DETECT_SETTERS, false);

View File

@ -0,0 +1,126 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.utils;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.metamx.common.ISE;
import com.metamx.common.StreamUtils;
import com.metamx.common.logger.Logger;
import sun.misc.IOUtils;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
/**
*/
public class CompressionUtils
{
private static final Logger log = new Logger(CompressionUtils.class);
public static long zip(File directory, File outputZipFile) throws IOException
{
if (!directory.isDirectory()) {
throw new IOException(String.format("directory[%s] is not a directory", directory));
}
if (!outputZipFile.getName().endsWith(".zip")) {
log.warn("No .zip suffix[%s], putting files from [%s] into it anyway.", outputZipFile, directory);
}
long totalSize = 0;
ZipOutputStream zipOut = null;
try {
zipOut = new ZipOutputStream(new FileOutputStream(outputZipFile));
File[] files = directory.listFiles();
for (File file : files) {
log.info("Adding file[%s] with size[%,d]. Total size[%,d]", file, file.length(), totalSize);
if (file.length() >= Integer.MAX_VALUE) {
zipOut.close();
outputZipFile.delete();
throw new IOException(String.format("file[%s] too large [%,d]", file, file.length()));
}
zipOut.putNextEntry(new ZipEntry(file.getName()));
totalSize += ByteStreams.copy(Files.newInputStreamSupplier(file), zipOut);
}
}
finally {
Closeables.closeQuietly(zipOut);
}
return totalSize;
}
public static void unzip(File pulledFile, File outDir) throws IOException
{
if (!(outDir.exists() && outDir.isDirectory())) {
throw new ISE("outDir[%s] must exist and be a directory", outDir);
}
log.info("Unzipping file[%s] to [%s]", pulledFile, outDir);
InputStream in = null;
try {
in = new BufferedInputStream(new FileInputStream(pulledFile));
unzip(in, outDir);
}
finally {
Closeables.closeQuietly(in);
}
}
public static void unzip(InputStream in, File outDir) throws IOException
{
ZipInputStream zipIn = new ZipInputStream(in);
ZipEntry entry;
while ((entry = zipIn.getNextEntry()) != null) {
OutputStream out = null;
try {
out = new FileOutputStream(new File(outDir, entry.getName()));
ByteStreams.copy(zipIn, out);
zipIn.closeEntry();
}
finally {
Closeables.closeQuietly(out);
}
}
}
public static void gunzip(File pulledFile, File outDir) throws IOException
{
log.info("Gunzipping file[%s] to [%s]", pulledFile, outDir);
StreamUtils.copyToFileAndClose(new GZIPInputStream(new FileInputStream(pulledFile)), outDir);
if (!pulledFile.delete()) {
log.error("Could not delete tmpFile[%s].", pulledFile);
}
}
}

View File

@ -19,12 +19,15 @@
package com.metamx.druid.utils;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.guava.Comparators;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.TreeSet;
@ -63,6 +66,17 @@ public class JodaUtils
return retVal;
}
public static boolean overlaps(final Interval i, Iterable<Interval> intervals) {
return Iterables.any(intervals, new Predicate<Interval>()
{
@Override
public boolean apply(@Nullable Interval input)
{
return input.overlaps(i);
}
});
}
public static DateTime minDateTime(DateTime... times)
{
if (times == null) {

View File

@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId>
<name>druid-services</name>
<description>druid-services</description>
<version>0.2.8-SNAPSHOT</version>
<version>0.3.4-SNAPSHOT</version>
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.2.8-SNAPSHOT</version>
<version>0.3.4-SNAPSHOT</version>
</parent>
<dependencies>
@ -66,4 +66,4 @@
</plugin>
</plugins>
</build>
</project>
</project>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.2.8-SNAPSHOT</version>
<version>0.3.4-SNAPSHOT</version>
</parent>
<modules>

View File

@ -1,6 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-examples-rand</artifactId>
@ -10,7 +9,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId>
<version>0.2.8-SNAPSHOT</version>
<version>0.3.4-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -12,7 +12,7 @@ import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.realtime.MetadataUpdater;
import com.metamx.druid.realtime.MetadataUpdaterConfig;
import com.metamx.druid.realtime.RealtimeNode;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.phonebook.PhoneBook;
@ -72,8 +72,8 @@ public class RealtimeStandaloneMain
// dummyMetadataUpdater will not send updates to db because standalone demo has no db
rn.setMetadataUpdater(dummyMetadataUpdater);
rn.setSegmentPusher(
new SegmentPusher()
rn.setDataSegmentPusher(
new DataSegmentPusher()
{
@Override
public DataSegment push(File file, DataSegment segment) throws IOException

View File

@ -1,6 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-examples-twitter</artifactId>
@ -10,7 +9,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId>
<version>0.2.8-SNAPSHOT</version>
<version>0.3.4-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -12,7 +12,7 @@ import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.realtime.MetadataUpdater;
import com.metamx.druid.realtime.MetadataUpdaterConfig;
import com.metamx.druid.realtime.RealtimeNode;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.phonebook.PhoneBook;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
@ -48,10 +48,12 @@ public class RealtimeStandaloneMain
rn.setPhoneBook(dummyPhoneBook);
MetadataUpdater dummyMetadataUpdater =
new MetadataUpdater(new DefaultObjectMapper(),
new MetadataUpdater(
new DefaultObjectMapper(),
Config.createFactory(Initialization.loadProperties()).build(MetadataUpdaterConfig.class),
dummyPhoneBook,
null) {
null
) {
@Override
public void publishSegment(DataSegment segment) throws IOException
{
@ -74,8 +76,8 @@ public class RealtimeStandaloneMain
// dummyMetadataUpdater will not send updates to db because standalone demo has no db
rn.setMetadataUpdater(dummyMetadataUpdater);
rn.setSegmentPusher(
new SegmentPusher()
rn.setDataSegmentPusher(
new DataSegmentPusher()
{
@Override
public DataSegment push(File file, DataSegment segment) throws IOException

View File

@ -18,8 +18,7 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-index-common</artifactId>
@ -29,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.2.8-SNAPSHOT</version>
<version>0.3.4-SNAPSHOT</version>
</parent>
<dependencies>
@ -95,4 +94,4 @@
</plugin>
</plugins>
</build>
</project>
</project>

View File

@ -72,30 +72,30 @@ class SimpleColumn implements Column
@Override
public DictionaryEncodedColumn getDictionaryEncoding()
{
return dictionaryEncodedColumn.get();
return dictionaryEncodedColumn == null ? null : dictionaryEncodedColumn.get();
}
@Override
public RunLengthColumn getRunLengthColumn()
{
return runLengthColumn.get();
return runLengthColumn == null ? null : runLengthColumn.get();
}
@Override
public GenericColumn getGenericColumn()
{
return genericColumn.get();
return genericColumn == null ? null : genericColumn.get();
}
@Override
public ComplexColumn getComplexColumn()
{
return complexColumn.get();
return complexColumn == null ? null : complexColumn.get();
}
@Override
public BitmapIndex getBitmapIndex()
{
return bitmapIndex.get();
return bitmapIndex == null ? null : bitmapIndex.get();
}
}

View File

@ -21,6 +21,7 @@ package com.metamx.druid.index.v1;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@ -62,7 +63,6 @@ import com.metamx.druid.kv.VSizeIndexedInts;
import com.metamx.druid.utils.SerializerUtils;
import it.uniroma3.mat.extendedset.intset.ConciseSet;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.joda.time.Interval;
import java.io.ByteArrayOutputStream;
@ -369,7 +369,8 @@ public class IndexIO
);
}
LinkedHashSet<String> skippedFiles = Sets.newLinkedHashSet();
final LinkedHashSet<String> skippedFiles = Sets.newLinkedHashSet();
final Set<String> skippedDimensions = Sets.newLinkedHashSet();
for (String filename : v8SmooshedFiles.getInternalFilenames()) {
log.info("Processing file[%s]", filename);
if (filename.startsWith("dim_")) {
@ -392,6 +393,12 @@ public class IndexIO
dimBuffer, GenericIndexed.stringStrategy
);
if (dictionary.size() == 0) {
log.info("Dimension[%s] had cardinality 0, equivalent to no column, so skipping.", dimension);
skippedDimensions.add(dimension);
continue;
}
VSizeIndexedInts singleValCol = null;
VSizeIndexed multiValCol = VSizeIndexed.readFromByteBuffer(dimBuffer.asReadOnlyBuffer());
GenericIndexed<ImmutableConciseSet> bitmaps = bitmapIndexes.get(dimension);
@ -555,35 +562,49 @@ public class IndexIO
channel.write(ByteBuffer.wrap(specBytes));
serdeficator.write(channel);
channel.close();
} else if ("index.drd".equals(filename)) {
final ByteBuffer indexBuffer = v8SmooshedFiles.mapFile(filename);
indexBuffer.get(); // Skip the version byte
final GenericIndexed<String> dims = GenericIndexed.read(
indexBuffer, GenericIndexed.stringStrategy
);
final GenericIndexed<String> availableMetrics = GenericIndexed.read(
indexBuffer, GenericIndexed.stringStrategy
);
final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer));
Set<String> columns = Sets.newTreeSet();
columns.addAll(Lists.newArrayList(dims));
columns.addAll(Lists.newArrayList(availableMetrics));
GenericIndexed<String> cols = GenericIndexed.fromIterable(columns, GenericIndexed.stringStrategy);
final int numBytes = cols.getSerializedSize() + dims.getSerializedSize() + 16;
final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes);
cols.writeToChannel(writer);
dims.writeToChannel(writer);
serializerUtils.writeLong(writer, dataInterval.getStartMillis());
serializerUtils.writeLong(writer, dataInterval.getEndMillis());
writer.close();
} else {
skippedFiles.add(filename);
}
}
final ByteBuffer indexBuffer = v8SmooshedFiles.mapFile("index.drd");
indexBuffer.get(); // Skip the version byte
final GenericIndexed<String> dims8 = GenericIndexed.read(
indexBuffer, GenericIndexed.stringStrategy
);
final GenericIndexed<String> dims9 = GenericIndexed.fromIterable(
Iterables.filter(
dims8, new Predicate<String>()
{
@Override
public boolean apply(String s)
{
return !skippedDimensions.contains(s);
}
}
),
GenericIndexed.stringStrategy
);
final GenericIndexed<String> availableMetrics = GenericIndexed.read(
indexBuffer, GenericIndexed.stringStrategy
);
final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer));
Set<String> columns = Sets.newTreeSet();
columns.addAll(Lists.newArrayList(dims9));
columns.addAll(Lists.newArrayList(availableMetrics));
GenericIndexed<String> cols = GenericIndexed.fromIterable(columns, GenericIndexed.stringStrategy);
final int numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16;
final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes);
cols.writeToChannel(writer);
dims9.writeToChannel(writer);
serializerUtils.writeLong(writer, dataInterval.getStartMillis());
serializerUtils.writeLong(writer, dataInterval.getEndMillis());
writer.close();
log.info("Skipped files[%s]", skippedFiles);
v9Smoosher.close();

View File

@ -18,8 +18,7 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-indexer</artifactId>
@ -29,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.2.8-SNAPSHOT</version>
<version>0.3.4-SNAPSHOT</version>
</parent>
<dependencies>
@ -58,14 +57,6 @@
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>

View File

@ -266,8 +266,7 @@ public class DeterminePartitionsJob implements Jobby
Context context
) throws IOException, InterruptedException
{
// Create group key
// TODO -- There are more efficient ways to do this
// Create group key, there are probably more efficient ways of doing this
final Map<String, Set<String>> dims = Maps.newTreeMap();
for(final String dim : inputRow.getDimensions()) {
final Set<String> dimValues = ImmutableSortedSet.copyOf(inputRow.getDimension(dim));
@ -394,6 +393,9 @@ public class DeterminePartitionsJob implements Jobby
final Interval interval = maybeInterval.get();
final byte[] groupKey = interval.getStart().toString().getBytes(Charsets.UTF_8);
// Emit row-counter value.
write(context, groupKey, new DimValueCount("", "", 1));
for(final Map.Entry<String, Iterable<String>> dimAndValues : dims.entrySet()) {
final String dim = dimAndValues.getKey();
@ -510,9 +512,23 @@ public class DeterminePartitionsJob implements Jobby
Context context, SortableBytes keyBytes, Iterable<DimValueCount> combinedIterable
) throws IOException, InterruptedException
{
PeekingIterator<DimValueCount> iterator = Iterators.peekingIterator(combinedIterable.iterator());
final DateTime bucket = new DateTime(new String(keyBytes.getGroupKey(), Charsets.UTF_8));
final PeekingIterator<DimValueCount> iterator = Iterators.peekingIterator(combinedIterable.iterator());
// "iterator" will take us over many candidate dimensions
log.info(
"Determining partitions for interval: %s",
config.getGranularitySpec().bucketInterval(bucket).orNull()
);
// First DVC should be the total row count indicator
final DimValueCount firstDvc = iterator.next();
final int totalRows = firstDvc.numRows;
if(!firstDvc.dim.equals("") || !firstDvc.value.equals("")) {
throw new IllegalStateException("WTF?! Expected total row indicator on first k/v pair!");
}
// "iterator" will now take us over many candidate dimensions
DimPartitions currentDimPartitions = null;
DimPartition currentDimPartition = null;
String currentDimPartitionStart = null;
@ -636,8 +652,6 @@ public class DeterminePartitionsJob implements Jobby
throw new ISE("No suitable partitioning dimension found!");
}
final int totalRows = dimPartitionss.values().iterator().next().getRows();
int maxCardinality = Integer.MIN_VALUE;
long minVariance = Long.MAX_VALUE;
DimPartitions minVariancePartitions = null;
@ -645,12 +659,14 @@ public class DeterminePartitionsJob implements Jobby
for(final DimPartitions dimPartitions : dimPartitionss.values()) {
if(dimPartitions.getRows() != totalRows) {
throw new ISE(
"WTF?! Dimension[%s] row count %,d != expected row count %,d",
log.info(
"Dimension[%s] is not present in all rows (row count %,d != expected row count %,d)",
dimPartitions.dim,
dimPartitions.getRows(),
totalRows
);
continue;
}
// Make sure none of these shards are oversized
@ -684,7 +700,6 @@ public class DeterminePartitionsJob implements Jobby
throw new ISE("No suitable partitioning dimension found!");
}
final DateTime bucket = new DateTime(new String(keyBytes.getGroupKey(), Charsets.UTF_8));
final OutputStream out = Utils.makePathAndOutputStream(
context, config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0)), config.isOverwriteFiles()
);

View File

@ -662,8 +662,9 @@ public class HadoopDruidIndexerConfig
return new Path(
String.format(
"%s/%s_%s/%s/%s",
"%s/%s/%s_%s/%s/%s",
getSegmentOutputDir(),
dataSource,
bucketInterval.getStart().toString(),
bucketInterval.getEnd().toString(),
getVersion().toString(),

View File

@ -379,7 +379,8 @@ public class IndexGeneratorJob implements Jobby
);
} else if (outputFS instanceof LocalFileSystem) {
loadSpec = ImmutableMap.<String, Object>of(
"type", "test"
"type", "local",
"path", indexOutURI.getPath()
);
} else {
throw new ISE("Unknown file system[%s]", outputFS.getClass());

View File

@ -18,8 +18,7 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-merger</artifactId>
@ -29,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.2.8-SNAPSHOT</version>
<version>0.3.4-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -20,13 +20,15 @@
package com.metamx.druid.merger.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.metamx.druid.loading.S3SegmentGetterConfig;
import com.metamx.druid.loading.S3SegmentPuller;
import com.metamx.druid.loading.S3ZippedSegmentPuller;
import com.google.common.collect.Maps;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.MMappedQueryableIndexFactory;
import com.metamx.druid.loading.S3DataSegmentPuller;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.loading.SegmentPuller;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.loading.SingleSegmentLoader;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.task.Task;
@ -34,6 +36,7 @@ import com.metamx.emitter.service.ServiceEmitter;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import java.io.File;
import java.util.List;
import java.util.Map;
/**
@ -45,7 +48,7 @@ public class TaskToolbox
private final TaskActionClient taskActionClient;
private final ServiceEmitter emitter;
private final RestS3Service s3Client;
private final SegmentPusher segmentPusher;
private final DataSegmentPusher segmentPusher;
private final SegmentKiller segmentKiller;
private final ObjectMapper objectMapper;
@ -54,7 +57,7 @@ public class TaskToolbox
TaskActionClient taskActionClient,
ServiceEmitter emitter,
RestS3Service s3Client,
SegmentPusher segmentPusher,
DataSegmentPusher segmentPusher,
SegmentKiller segmentKiller,
ObjectMapper objectMapper
)
@ -83,7 +86,7 @@ public class TaskToolbox
return emitter;
}
public SegmentPusher getSegmentPusher()
public DataSegmentPusher getSegmentPusher()
{
return segmentPusher;
}
@ -98,21 +101,27 @@ public class TaskToolbox
return objectMapper;
}
public Map<String, SegmentPuller> getSegmentGetters(final Task task)
public Map<DataSegment, File> getSegments(final Task task, List<DataSegment> segments)
throws SegmentLoadingException
{
final S3SegmentGetterConfig getterConfig = new S3SegmentGetterConfig()
{
@Override
public File getCacheDirectory()
{
return new File(config.getTaskDir(task), "fetched_segments");
}
};
final SingleSegmentLoader loader = new SingleSegmentLoader(
new S3DataSegmentPuller(s3Client),
new MMappedQueryableIndexFactory(),
new SegmentLoaderConfig()
{
@Override
public File getCacheDirectory()
{
return new File(config.getTaskDir(task), "fetched_segments");
}
}
);
return ImmutableMap.<String, SegmentPuller>builder()
.put("s3", new S3SegmentPuller(s3Client, getterConfig))
.put("s3_union", new S3SegmentPuller(s3Client, getterConfig))
.put("s3_zip", new S3ZippedSegmentPuller(s3Client, getterConfig))
.build();
Map<DataSegment, File> retVal = Maps.newLinkedHashMap();
for (DataSegment segment : segments) {
retVal.put(segment, loader.getSegmentFiles(segment));
}
return retVal;
}
}

View File

@ -1,25 +1,31 @@
package com.metamx.druid.merger.common.actions;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.ToStringResponseHandler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.curator.x.discovery.ServiceInstance;
import com.netflix.curator.x.discovery.ServiceProvider;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
public class RemoteTaskActionClient implements TaskActionClient
{
private final HttpClient httpClient;
private final ServiceProvider serviceProvider;
private final ObjectMapper jsonMapper;
private static final Logger log = new Logger(RemoteTaskActionClient.class);
public RemoteTaskActionClient(HttpClient httpClient, ObjectMapper jsonMapper)
public RemoteTaskActionClient(HttpClient httpClient, ServiceProvider serviceProvider, ObjectMapper jsonMapper)
{
this.httpClient = httpClient;
this.serviceProvider = serviceProvider;
this.jsonMapper = jsonMapper;
}
@ -34,20 +40,36 @@ public class RemoteTaskActionClient implements TaskActionClient
.go(new ToStringResponseHandler(Charsets.UTF_8))
.get();
// TODO Figure out how to check HTTP status code
if(response.equals("")) {
return null;
} else {
return jsonMapper.readValue(response, taskAction.getReturnTypeReference());
}
final Map<String, Object> responseDict = jsonMapper.readValue(
response,
new TypeReference<Map<String, Object>>() {}
);
return jsonMapper.convertValue(responseDict.get("result"), taskAction.getReturnTypeReference());
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
public URI getServiceUri() throws URISyntaxException
private URI getServiceUri() throws Exception
{
return new URI("http://localhost:8087/mmx/merger/v1/action");
final ServiceInstance instance = serviceProvider.getInstance();
final String scheme;
final String host;
final int port;
final String path = "/mmx/merger/v1/action";
host = instance.getAddress();
if (instance.getSslPort() != null && instance.getSslPort() > 0) {
scheme = "https";
port = instance.getSslPort();
} else {
scheme = "http";
port = instance.getPort();
}
return new URI(scheme, null, host, port, path, null, null);
}
}

View File

@ -51,27 +51,8 @@ public class SegmentInsertAction implements TaskAction<Void>
@Override
public Void perform(TaskActionToolbox toolbox)
{
// Verify that each of these segments-to-insert falls under some lock
// TODO: Should this be done while holding the giant lock on TaskLockbox? (To prevent anyone from grabbing
// TODO: these locks out from under us while the operation is ongoing.) Probably not necessary.
final List<TaskLock> taskLocks = toolbox.getTaskLockbox().findLocksForTask(task);
for(final DataSegment segment : segments) {
final boolean ok = Iterables.any(
taskLocks, new Predicate<TaskLock>()
{
@Override
public boolean apply(TaskLock taskLock)
{
return taskLock.getVersion().equals(segment.getVersion())
&& taskLock.getDataSource().equals(segment.getDataSource())
&& taskLock.getInterval().contains(segment.getInterval());
}
}
);
if(!ok) {
throw new ISE("No currently-held lock covers segment: %s", segment);
}
if(!toolbox.taskLockCoversSegments(task, segments, false)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
try {

View File

@ -51,27 +51,8 @@ public class SegmentNukeAction implements TaskAction<Void>
@Override
public Void perform(TaskActionToolbox toolbox)
{
// Verify that each of these segments-to-nuke falls under some lock
// TODO: Should this be done while holding the giant lock on TaskLockbox? (To prevent anyone from grabbing
// TODO: these locks out from under us while the operation is ongoing.) Probably not necessary.
final List<TaskLock> taskLocks = toolbox.getTaskLockbox().findLocksForTask(task);
for(final DataSegment segment : segments) {
final boolean ok = Iterables.any(
taskLocks, new Predicate<TaskLock>()
{
@Override
public boolean apply(TaskLock taskLock)
{
return taskLock.getVersion().compareTo(segment.getVersion()) >= 0
&& taskLock.getDataSource().equals(segment.getDataSource())
&& taskLock.getInterval().contains(segment.getInterval());
}
}
);
if(!ok) {
throw new ISE("No currently-held lock covers segment: %s", segment);
}
if(!toolbox.taskLockCoversSegments(task, segments, true)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
try {

View File

@ -1,10 +1,19 @@
package com.metamx.druid.merger.common.actions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.metamx.common.ISE;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.MergerDBCoordinator;
import com.metamx.druid.merger.coordinator.TaskLockbox;
import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.emitter.service.ServiceEmitter;
import java.util.List;
import java.util.Set;
public class TaskActionToolbox
{
private final TaskQueue taskQueue;
@ -44,4 +53,43 @@ public class TaskActionToolbox
{
return emitter;
}
public boolean taskLockCoversSegments(
final Task task,
final Set<DataSegment> segments,
final boolean allowOlderVersions
)
{
// Verify that each of these segments falls under some lock
// NOTE: It is possible for our lock to be revoked (if the task has failed and given up its locks) after we check
// NOTE: it and before we perform the segment insert, but, that should be OK since the worst that happens is we
// NOTE: insert some segments from the task but not others.
final List<TaskLock> taskLocks = getTaskLockbox().findLocksForTask(task);
for(final DataSegment segment : segments) {
final boolean ok = Iterables.any(
taskLocks, new Predicate<TaskLock>()
{
@Override
public boolean apply(TaskLock taskLock)
{
final boolean versionOk = allowOlderVersions
? taskLock.getVersion().compareTo(segment.getVersion()) >= 0
: taskLock.getVersion().equals(segment.getVersion());
return versionOk
&& taskLock.getDataSource().equals(segment.getDataSource())
&& taskLock.getInterval().contains(segment.getInterval());
}
}
);
if (!ok) {
return false;
}
}
return true;
}
}

View File

@ -13,7 +13,7 @@ public abstract class TaskConfig
@Config("druid.merger.rowFlushBoundary")
@Default("500000")
public abstract long getRowFlushBoundary();
public abstract int getDefaultRowFlushBoundary();
public File getTaskDir(final Task task) {
return new File(getBaseTaskDir(), task.getId());

View File

@ -35,7 +35,7 @@ import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.FireHydrant;
@ -45,11 +45,11 @@ import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.Sink;
import org.apache.commons.io.FileUtils;
import org.joda.time.Interval;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Set;
@ -61,7 +61,7 @@ public class YeOldePlumberSchool implements PlumberSchool
{
private final Interval interval;
private final String version;
private final SegmentPusher segmentPusher;
private final DataSegmentPusher dataSegmentPusher;
private final File tmpSegmentDir;
private static final Logger log = new Logger(YeOldePlumberSchool.class);
@ -70,13 +70,13 @@ public class YeOldePlumberSchool implements PlumberSchool
public YeOldePlumberSchool(
@JsonProperty("interval") Interval interval,
@JsonProperty("version") String version,
@JacksonInject("segmentPusher") SegmentPusher segmentPusher,
@JacksonInject("segmentPusher") DataSegmentPusher dataSegmentPusher,
@JacksonInject("tmpSegmentDir") File tmpSegmentDir
)
{
this.interval = interval;
this.version = version;
this.segmentPusher = segmentPusher;
this.dataSegmentPusher = dataSegmentPusher;
this.tmpSegmentDir = tmpSegmentDir;
}
@ -120,13 +120,13 @@ public class YeOldePlumberSchool implements PlumberSchool
@Override
public void finishJob()
{
// The segment we will upload
File fileToUpload = null;
try {
// User should have persisted everything by now.
Preconditions.checkState(!theSink.swappable(), "All data must be persisted before fininshing the job!");
// The segment we will upload
final File fileToUpload;
if(spilled.size() == 0) {
throw new IllegalStateException("Nothing indexed?");
} else if(spilled.size() == 1) {
@ -149,7 +149,7 @@ public class YeOldePlumberSchool implements PlumberSchool
.withVersion(version)
.withBinaryVersion(IndexIO.getVersionFromDir(fileToUpload));
segmentPusher.push(fileToUpload, segmentToUpload);
dataSegmentPusher.push(fileToUpload, segmentToUpload);
log.info(
"Uploaded segment[%s]",
@ -160,6 +160,17 @@ public class YeOldePlumberSchool implements PlumberSchool
log.warn(e, "Failed to merge and upload");
throw Throwables.propagate(e);
}
finally {
try {
if (fileToUpload != null) {
log.info("Deleting Index File[%s]", fileToUpload);
FileUtils.deleteDirectory(fileToUpload);
}
}
catch (IOException e) {
log.warn(e, "Error deleting directory[%s]", fileToUpload);
}
}
}
private void spillIfSwappable()

View File

@ -69,20 +69,13 @@ public abstract class AbstractTask implements Task
return dataSource;
}
@JsonProperty("interval")
@Override
public Optional<Interval> getFixedInterval()
{
return interval;
}
// Awesome hack to get around lack of serde for Optional<T>
// TODO Look into jackson-datatype-guava
@JsonProperty("interval")
private Interval getNullableIntervalForJackson()
{
return interval.orNull();
}
@Override
public TaskStatus preflight(TaskToolbox toolbox) throws Exception
{

View File

@ -50,11 +50,16 @@ public class IndexDeterminePartitionsTask extends AbstractTask
{
@JsonProperty
private final FirehoseFactory firehoseFactory;
@JsonProperty
private final Schema schema;
@JsonProperty
private final long targetPartitionSize;
@JsonProperty
private final int rowFlushBoundary;
private static final Logger log = new Logger(IndexTask.class);
@JsonCreator
@ -63,7 +68,8 @@ public class IndexDeterminePartitionsTask extends AbstractTask
@JsonProperty("interval") Interval interval,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("schema") Schema schema,
@JsonProperty("targetPartitionSize") long targetPartitionSize
@JsonProperty("targetPartitionSize") long targetPartitionSize,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary
)
{
super(
@ -81,6 +87,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask
this.firehoseFactory = firehoseFactory;
this.schema = schema;
this.targetPartitionSize = targetPartitionSize;
this.rowFlushBoundary = rowFlushBoundary;
}
@Override
@ -244,7 +251,8 @@ public class IndexDeterminePartitionsTask extends AbstractTask
schema.getAggregators(),
schema.getIndexGranularity(),
shardSpec
)
),
rowFlushBoundary
);
}
}

View File

@ -28,7 +28,7 @@ import com.google.common.collect.Maps;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
@ -58,6 +58,9 @@ public class IndexGeneratorTask extends AbstractTask
@JsonProperty
private final Schema schema;
@JsonProperty
private final int rowFlushBoundary;
private static final Logger log = new Logger(IndexTask.class);
@JsonCreator
@ -65,7 +68,8 @@ public class IndexGeneratorTask extends AbstractTask
@JsonProperty("groupId") String groupId,
@JsonProperty("interval") Interval interval,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("schema") Schema schema
@JsonProperty("schema") Schema schema,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary
)
{
super(
@ -83,6 +87,7 @@ public class IndexGeneratorTask extends AbstractTask
this.firehoseFactory = firehoseFactory;
this.schema = schema;
this.rowFlushBoundary = rowFlushBoundary;
}
@Override
@ -118,7 +123,7 @@ public class IndexGeneratorTask extends AbstractTask
// We need to track published segments.
final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<DataSegment>();
final SegmentPusher wrappedSegmentPusher = new SegmentPusher()
final DataSegmentPusher wrappedDataSegmentPusher = new DataSegmentPusher()
{
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
@ -135,10 +140,15 @@ public class IndexGeneratorTask extends AbstractTask
final Plumber plumber = new YeOldePlumberSchool(
interval,
myLock.getVersion(),
wrappedSegmentPusher,
wrappedDataSegmentPusher,
tmpDir
).findPlumber(schema, metrics);
// rowFlushBoundary for this job
final int myRowFlushBoundary = this.rowFlushBoundary > 0
? rowFlushBoundary
: toolbox.getConfig().getDefaultRowFlushBoundary();
try {
while(firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
@ -157,7 +167,7 @@ public class IndexGeneratorTask extends AbstractTask
int numRows = sink.add(inputRow);
metrics.incrementProcessed();
if(numRows >= toolbox.getConfig().getRowFlushBoundary()) {
if(numRows >= myRowFlushBoundary) {
plumber.persist(firehose.commit());
}
} else {

View File

@ -42,11 +42,23 @@ import java.util.List;
public class IndexTask extends AbstractTask
{
@JsonProperty private final GranularitySpec granularitySpec;
@JsonProperty private final AggregatorFactory[] aggregators;
@JsonProperty private final QueryGranularity indexGranularity;
@JsonProperty private final long targetPartitionSize;
@JsonProperty private final FirehoseFactory firehoseFactory;
@JsonProperty
private final GranularitySpec granularitySpec;
@JsonProperty
private final AggregatorFactory[] aggregators;
@JsonProperty
private final QueryGranularity indexGranularity;
@JsonProperty
private final long targetPartitionSize;
@JsonProperty
private final FirehoseFactory firehoseFactory;
@JsonProperty
private final int rowFlushBoundary;
private static final Logger log = new Logger(IndexTask.class);
@ -57,7 +69,8 @@ public class IndexTask extends AbstractTask
@JsonProperty("aggregators") AggregatorFactory[] aggregators,
@JsonProperty("indexGranularity") QueryGranularity indexGranularity,
@JsonProperty("targetPartitionSize") long targetPartitionSize,
@JsonProperty("firehose") FirehoseFactory firehoseFactory
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary
)
{
super(
@ -75,6 +88,7 @@ public class IndexTask extends AbstractTask
this.indexGranularity = indexGranularity;
this.targetPartitionSize = targetPartitionSize;
this.firehoseFactory = firehoseFactory;
this.rowFlushBoundary = rowFlushBoundary;
}
public List<Task> toSubtasks()
@ -95,7 +109,8 @@ public class IndexTask extends AbstractTask
indexGranularity,
new NoneShardSpec()
),
targetPartitionSize
targetPartitionSize,
rowFlushBoundary
)
);
} else {
@ -110,7 +125,8 @@ public class IndexTask extends AbstractTask
aggregators,
indexGranularity,
new NoneShardSpec()
)
),
rowFlushBoundary
)
);
}

View File

@ -22,6 +22,7 @@ package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
@ -31,27 +32,22 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.loading.SegmentPuller;
import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.LockListAction;
import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import com.metamx.druid.merger.common.actions.SegmentListUsedAction;
import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.service.AlertEvent;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.apache.commons.codec.digest.DigestUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -151,13 +147,7 @@ public abstract class MergeTask extends AbstractTask
// download segments to merge
final Map<String, SegmentPuller> segmentGetters = toolbox.getSegmentGetters(this);
final Map<DataSegment, File> gettedSegments = Maps.newHashMap();
for (final DataSegment segment : segments) {
Map<String, Object> loadSpec = segment.getLoadSpec();
SegmentPuller segmentPuller = segmentGetters.get(loadSpec.get("type"));
gettedSegments.put(segment, segmentPuller.getSegmentFiles(segment));
}
final Map<DataSegment, File> gettedSegments = toolbox.getSegments(this, segments);
// merge files together
final File fileToUpload = merge(gettedSegments, new File(taskDir, "merged"));
@ -292,7 +282,11 @@ public abstract class MergeTask extends AbstractTask
)
);
return String.format("%s_%s", dataSource, DigestUtils.sha1Hex(segmentIDs).toLowerCase());
return String.format(
"%s_%s",
dataSource,
Hashing.sha1().hashString(segmentIDs, Charsets.UTF_8).toString().toLowerCase()
);
}
private static Interval computeMergedInterval(final List<DataSegment> segments)

View File

@ -49,11 +49,11 @@ import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentPusherConfig;
import com.metamx.druid.loading.S3SegmentKiller;
import com.metamx.druid.loading.S3SegmentPusher;
import com.metamx.druid.loading.S3SegmentPusherConfig;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.LocalTaskActionClient;
import com.metamx.druid.merger.common.actions.TaskActionToolbox;
@ -103,8 +103,6 @@ import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
@ -467,9 +465,9 @@ public class IndexerCoordinatorNode extends RegisteringNode
public void initializeTaskToolbox()
{
if (taskToolbox == null) {
final SegmentPusher segmentPusher = new S3SegmentPusher(
final DataSegmentPusher dataSegmentPusher = new S3DataSegmentPusher(
s3Service,
configFactory.build(S3SegmentPusherConfig.class),
configFactory.build(S3DataSegmentPusherConfig.class),
jsonMapper
);
final SegmentKiller segmentKiller = new S3SegmentKiller(
@ -483,7 +481,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
),
emitter,
s3Service,
segmentPusher,
dataSegmentPusher,
segmentKiller,
jsonMapper
);

View File

@ -21,6 +21,7 @@ package com.metamx.druid.merger.coordinator.http;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
@ -176,6 +177,9 @@ public class IndexerCoordinatorResource
public <T> Response doAction(final TaskAction<T> action)
{
final T ret = taskMasterLifecycle.getTaskToolbox().getTaskActionClient().submit(action);
return Response.ok().entity(ret).build();
final Map<String, Object> retMap = Maps.newHashMap();
retMap.put("result", ret);
return Response.ok().entity(retMap).build();
}
}

View File

@ -39,6 +39,9 @@ public abstract class WorkerConfig
@Config("druid.worker.version")
public abstract String getVersion();
@Config("druid.worker.masterService")
public abstract String getMasterService();
@Config("druid.worker.capacity")
public int getCapacity()
{

View File

@ -35,12 +35,13 @@ import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentPusherConfig;
import com.metamx.druid.loading.S3SegmentKiller;
import com.metamx.druid.loading.S3SegmentPusher;
import com.metamx.druid.loading.S3SegmentPusherConfig;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.RemoteTaskActionClient;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
@ -64,6 +65,8 @@ import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import com.netflix.curator.x.discovery.ServiceDiscovery;
import com.netflix.curator.x.discovery.ServiceProvider;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
@ -105,6 +108,8 @@ public class WorkerNode extends RegisteringNode
private WorkerConfig workerConfig = null;
private TaskToolbox taskToolbox = null;
private CuratorFramework curatorFramework = null;
private ServiceDiscovery serviceDiscovery = null;
private ServiceProvider coordinatorServiceProvider = null;
private WorkerCuratorCoordinator workerCuratorCoordinator = null;
private TaskMonitor taskMonitor = null;
private Server server = null;
@ -156,6 +161,18 @@ public class WorkerNode extends RegisteringNode
return this;
}
public WorkerNode setCoordinatorServiceProvider(ServiceProvider coordinatorServiceProvider)
{
this.coordinatorServiceProvider = coordinatorServiceProvider;
return this;
}
public WorkerNode setServiceDiscovery(ServiceDiscovery serviceDiscovery)
{
this.serviceDiscovery = serviceDiscovery;
return this;
}
public WorkerNode setWorkerCuratorCoordinator(WorkerCuratorCoordinator workerCuratorCoordinator)
{
this.workerCuratorCoordinator = workerCuratorCoordinator;
@ -175,10 +192,12 @@ public class WorkerNode extends RegisteringNode
initializeS3Service();
initializeMonitors();
initializeMergerConfig();
initializeCuratorFramework();
initializeServiceDiscovery();
initializeCoordinatorServiceProvider();
initializeTaskToolbox();
initializeJacksonInjections();
initializeJacksonSubtypes();
initializeCuratorFramework();
initializeCuratorCoordinator();
initializeTaskMonitor();
initializeServer();
@ -318,9 +337,9 @@ public class WorkerNode extends RegisteringNode
public void initializeTaskToolbox() throws S3ServiceException
{
if (taskToolbox == null) {
final SegmentPusher segmentPusher = new S3SegmentPusher(
final DataSegmentPusher dataSegmentPusher = new S3DataSegmentPusher(
s3Service,
configFactory.build(S3SegmentPusherConfig.class),
configFactory.build(S3DataSegmentPusherConfig.class),
jsonMapper
);
final SegmentKiller segmentKiller = new S3SegmentKiller(
@ -328,10 +347,10 @@ public class WorkerNode extends RegisteringNode
);
taskToolbox = new TaskToolbox(
taskConfig,
new RemoteTaskActionClient(httpClient, jsonMapper),
new RemoteTaskActionClient(httpClient, coordinatorServiceProvider, jsonMapper),
emitter,
s3Service,
segmentPusher,
dataSegmentPusher,
segmentKiller,
jsonMapper
);
@ -340,11 +359,36 @@ public class WorkerNode extends RegisteringNode
public void initializeCuratorFramework() throws IOException
{
final CuratorConfig curatorConfig = configFactory.build(CuratorConfig.class);
curatorFramework = Initialization.makeCuratorFrameworkClient(
curatorConfig,
lifecycle
);
if (curatorFramework == null) {
final CuratorConfig curatorConfig = configFactory.build(CuratorConfig.class);
curatorFramework = Initialization.makeCuratorFrameworkClient(
curatorConfig,
lifecycle
);
}
}
public void initializeServiceDiscovery() throws Exception
{
if (serviceDiscovery == null) {
final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class);
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
curatorFramework,
config,
lifecycle
);
}
}
public void initializeCoordinatorServiceProvider()
{
if (coordinatorServiceProvider == null) {
this.coordinatorServiceProvider = Initialization.makeServiceProvider(
workerConfig.getMasterService(),
serviceDiscovery,
lifecycle
);
}
}
public void initializeCuratorCoordinator()

View File

@ -19,9 +19,10 @@
package com.metamx.druid.merger.common.task;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.hash.Hashing;
import com.metamx.druid.client.DataSegment;
import org.apache.commons.codec.digest.DigestUtils;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
@ -72,11 +73,12 @@ public class MergeTaskTest
@Test
public void testID()
{
final String desiredPrefix = "merge_foo_" + DigestUtils.sha1Hex(
final String desiredPrefix = "merge_foo_" + Hashing.sha1().hashString(
"2012-01-03T00:00:00.000Z_2012-01-05T00:00:00.000Z_V1_0"
+ "_2012-01-04T00:00:00.000Z_2012-01-06T00:00:00.000Z_V1_0"
+ "_2012-01-05T00:00:00.000Z_2012-01-07T00:00:00.000Z_V1_0"
) + "_";
, Charsets.UTF_8
).toString().toLowerCase() + "_";
Assert.assertEquals(
desiredPrefix,
testMergeTask.getId().substring(0, desiredPrefix.length())

View File

@ -26,7 +26,8 @@ public class TaskSerdeTest
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
10000,
null
null,
-1
);
final ObjectMapper jsonMapper = new DefaultObjectMapper();
@ -52,7 +53,8 @@ public class TaskSerdeTest
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
new NoneShardSpec()
)
),
-1
);
final ObjectMapper jsonMapper = new DefaultObjectMapper();
@ -97,6 +99,7 @@ public class TaskSerdeTest
final ObjectMapper jsonMapper = new DefaultObjectMapper();
final String json = jsonMapper.writeValueAsString(task);
System.out.println(json);
final Task task2 = jsonMapper.readValue(json, Task.class);
Assert.assertEquals(task.getId(), task2.getId());

View File

@ -296,7 +296,7 @@ public class RemoteTaskRunnerTest
}
@Override
public long getRowFlushBoundary()
public int getDefaultRowFlushBoundary()
{
return 0;
}

View File

@ -20,9 +20,9 @@ import com.metamx.druid.indexer.granularity.UniformGranularitySpec;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.loading.SegmentPuller;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
@ -106,7 +106,7 @@ public class TaskLifecycleTest
}
@Override
public long getRowFlushBoundary()
public int getDefaultRowFlushBoundary()
{
return 50000;
}
@ -114,7 +114,7 @@ public class TaskLifecycleTest
new LocalTaskActionClient(ts, new TaskActionToolbox(tq, tl, mdc, newMockEmitter())),
newMockEmitter(),
null, // s3 client
new SegmentPusher()
new DataSegmentPusher()
{
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
@ -134,7 +134,9 @@ public class TaskLifecycleTest
)
{
@Override
public Map<String, SegmentPuller> getSegmentGetters(Task task)
public Map<DataSegment, File> getSegments(
Task task, List<DataSegment> segments
) throws SegmentLoadingException
{
return ImmutableMap.of();
}
@ -180,7 +182,8 @@ public class TaskLifecycleTest
IR("2010-01-02T01", "a", "b", 2),
IR("2010-01-02T01", "a", "c", 1)
)
)
),
-1
);
final TaskStatus mergedStatus = runTask(indexTask);
@ -214,7 +217,8 @@ public class TaskLifecycleTest
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
10000,
newMockExceptionalFirehoseFactory()
newMockExceptionalFirehoseFactory(),
-1
);
final TaskStatus mergedStatus = runTask(indexTask);

19
pom.xml
View File

@ -23,7 +23,7 @@
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.2.8-SNAPSHOT</version>
<version>0.3.4-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>
@ -175,7 +175,12 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.1.3</version>
<version>2.1.4-mmx-2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
<version>2.1.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
@ -192,6 +197,16 @@
<artifactId>jackson-jaxrs-json-provider</artifactId>
<version>2.1.3</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.11</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.11</version>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>

View File

@ -18,8 +18,7 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-realtime</artifactId>
@ -29,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.2.8-SNAPSHOT</version>
<version>0.3.4-SNAPSHOT</version>
</parent>
<properties>

View File

@ -19,6 +19,7 @@
package com.metamx.druid.realtime;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
@ -31,6 +32,7 @@ public class FireDepartmentConfig
private final int maxRowsInMemory;
private final Period intermediatePersistPeriod;
@JsonCreator
public FireDepartmentConfig(
@JsonProperty("maxRowsInMemory") int maxRowsInMemory,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod

View File

@ -46,9 +46,11 @@ import com.metamx.druid.http.QueryServlet;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.S3SegmentPusher;
import com.metamx.druid.loading.S3SegmentPusherConfig;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.LocalDataSegmentPusher;
import com.metamx.druid.loading.LocalDataSegmentPusherConfig;
import com.metamx.druid.loading.S3DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentPusherConfig;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.service.ServiceEmitter;
@ -86,7 +88,7 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
private final Map<String, Object> injectablesMap = Maps.newLinkedHashMap();
private MetadataUpdater metadataUpdater = null;
private SegmentPusher segmentPusher = null;
private DataSegmentPusher dataSegmentPusher = null;
private List<FireDepartment> fireDepartments = null;
private ServerView view = null;
@ -117,10 +119,10 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
return this;
}
public RealtimeNode setSegmentPusher(SegmentPusher segmentPusher)
public RealtimeNode setDataSegmentPusher(DataSegmentPusher dataSegmentPusher)
{
Preconditions.checkState(this.segmentPusher == null, "Cannot set segmentPusher once it has already been set.");
this.segmentPusher = segmentPusher;
Preconditions.checkState(this.dataSegmentPusher == null, "Cannot set segmentPusher once it has already been set.");
this.dataSegmentPusher = dataSegmentPusher;
return this;
}
@ -144,10 +146,10 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
return metadataUpdater;
}
public SegmentPusher getSegmentPusher()
public DataSegmentPusher getDataSegmentPusher()
{
initializeSegmentPusher();
return segmentPusher;
return dataSegmentPusher;
}
public List<FireDepartment> getFireDepartments()
@ -220,7 +222,7 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
}
injectables.put("queryRunnerFactoryConglomerate", getConglomerate());
injectables.put("segmentPusher", segmentPusher);
injectables.put("segmentPusher", dataSegmentPusher);
injectables.put("metadataUpdater", metadataUpdater);
injectables.put("serverView", view);
injectables.put("serviceEmitter", getEmitter());
@ -256,22 +258,32 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
private void initializeSegmentPusher()
{
if (segmentPusher == null) {
if (dataSegmentPusher == null) {
final Properties props = getProps();
final RestS3Service s3Client;
try {
s3Client = new RestS3Service(
new AWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
if (Boolean.parseBoolean(props.getProperty("druid.pusher.local", "false"))) {
dataSegmentPusher = new LocalDataSegmentPusher(
getConfigFactory().build(LocalDataSegmentPusherConfig.class), getJsonMapper()
);
}
catch (S3ServiceException e) {
throw Throwables.propagate(e);
}
else {
segmentPusher = new S3SegmentPusher(s3Client, getConfigFactory().build(S3SegmentPusherConfig.class), getJsonMapper());
final RestS3Service s3Client;
try {
s3Client = new RestS3Service(
new AWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
);
}
catch (S3ServiceException e) {
throw Throwables.propagate(e);
}
dataSegmentPusher = new S3DataSegmentPusher(
s3Client, getConfigFactory().build(S3DataSegmentPusherConfig.class), getJsonMapper()
);
}
}
}

View File

@ -48,7 +48,7 @@ import com.metamx.druid.index.Segment;
import com.metamx.druid.index.v1.IndexGranularity;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.query.MetricsEmittingQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory;
@ -95,7 +95,7 @@ public class RealtimePlumberSchool implements PlumberSchool
private volatile RejectionPolicyFactory rejectionPolicyFactory = null;
private volatile QueryRunnerFactoryConglomerate conglomerate = null;
private volatile SegmentPusher segmentPusher = null;
private volatile DataSegmentPusher dataSegmentPusher = null;
private volatile MetadataUpdater metadataUpdater = null;
private volatile ServerView serverView = null;
private ServiceEmitter emitter;
@ -130,9 +130,9 @@ public class RealtimePlumberSchool implements PlumberSchool
}
@JacksonInject("segmentPusher")
public void setSegmentPusher(SegmentPusher segmentPusher)
public void setDataSegmentPusher(DataSegmentPusher dataSegmentPusher)
{
this.segmentPusher = segmentPusher;
this.dataSegmentPusher = dataSegmentPusher;
}
@JacksonInject("metadataUpdater")
@ -307,7 +307,7 @@ public class RealtimePlumberSchool implements PlumberSchool
}
}
final File mergedFile;
File mergedFile = null;
try {
List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
@ -325,7 +325,7 @@ public class RealtimePlumberSchool implements PlumberSchool
QueryableIndex index = IndexIO.loadIndex(mergedFile);
DataSegment segment = segmentPusher.push(
DataSegment segment = dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
);
@ -337,6 +337,19 @@ public class RealtimePlumberSchool implements PlumberSchool
.addData("interval", interval)
.emit();
}
if (mergedFile != null) {
try {
if (mergedFile != null) {
log.info("Deleting Index File[%s]", mergedFile);
FileUtils.deleteDirectory(mergedFile);
}
}
catch (IOException e) {
log.warn(e, "Error deleting directory[%s]", mergedFile);
}
}
}
}
);
@ -512,7 +525,7 @@ public class RealtimePlumberSchool implements PlumberSchool
private void verifyState()
{
Preconditions.checkNotNull(conglomerate, "must specify a queryRunnerFactoryConglomerate to do this action.");
Preconditions.checkNotNull(segmentPusher, "must specify a segmentPusher to do this action.");
Preconditions.checkNotNull(dataSegmentPusher, "must specify a segmentPusher to do this action.");
Preconditions.checkNotNull(metadataUpdater, "must specify a metadataUpdater to do this action.");
Preconditions.checkNotNull(serverView, "must specify a serverView to do this action.");
Preconditions.checkNotNull(emitter, "must specify a serviceEmitter to do this action.");

View File

@ -20,20 +20,22 @@
package com.metamx.druid.realtime;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.loading.S3SegmentPusherConfig;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentPusherConfig;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
/**
* A placeholder class to make the move of the SegmentPushers to a new package backwards compatible
*
* Exists in 0.2, can be removed from 0.3 on
*/
@Deprecated
public class S3SegmentPusher extends com.metamx.druid.loading.S3SegmentPusher implements SegmentPusher
public class S3SegmentPusher extends com.metamx.druid.loading.S3DataSegmentPusher implements DataSegmentPusher
{
public S3SegmentPusher(
RestS3Service s3Client,
S3SegmentPusherConfig config,
S3DataSegmentPusherConfig config,
ObjectMapper jsonMapper
)
{

View File

@ -18,8 +18,7 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-server</artifactId>
@ -29,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.2.8-SNAPSHOT</version>
<version>0.3.4-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -30,7 +30,7 @@ import com.metamx.druid.client.DataSegment;
import com.metamx.druid.collect.CountingMap;
import com.metamx.druid.index.Segment;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.loading.StorageAdapterLoadingException;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.partition.PartitionChunk;
import com.metamx.druid.partition.PartitionHolder;
import com.metamx.druid.query.BySegmentQueryRunner;
@ -104,24 +104,24 @@ public class ServerManager implements QuerySegmentWalker
}
}
public void loadSegment(final DataSegment segment) throws StorageAdapterLoadingException
public void loadSegment(final DataSegment segment) throws SegmentLoadingException
{
final Segment adapter;
try {
adapter = segmentLoader.getSegment(segment);
}
catch (StorageAdapterLoadingException e) {
catch (SegmentLoadingException e) {
try {
segmentLoader.cleanup(segment);
}
catch (StorageAdapterLoadingException e1) {
catch (SegmentLoadingException e1) {
// ignore
}
throw e;
}
if (adapter == null) {
throw new StorageAdapterLoadingException("Null adapter from loadSpec[%s]", segment.getLoadSpec());
throw new SegmentLoadingException("Null adapter from loadSpec[%s]", segment.getLoadSpec());
}
synchronized (lock) {
@ -139,7 +139,7 @@ public class ServerManager implements QuerySegmentWalker
);
if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) {
log.info("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier());
throw new StorageAdapterLoadingException("Segment already exists[%s]", segment.getIdentifier());
throw new SegmentLoadingException("Segment already exists[%s]", segment.getIdentifier());
}
loadedIntervals.add(
@ -154,7 +154,7 @@ public class ServerManager implements QuerySegmentWalker
}
}
public void dropSegment(final DataSegment segment) throws StorageAdapterLoadingException
public void dropSegment(final DataSegment segment) throws SegmentLoadingException
{
String dataSource = segment.getDataSource();
synchronized (lock) {

View File

@ -29,7 +29,7 @@ import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.loading.StorageAdapterLoadingException;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.emitter.service.AlertEvent;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.phonebook.PhoneBook;
@ -245,14 +245,14 @@ public class ZkCoordinator implements DataSegmentChangeHandler
}
catch (IOException e) {
removeSegment(segment);
throw new StorageAdapterLoadingException(
throw new SegmentLoadingException(
"Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
);
}
yp.announce(servedSegmentsLocation, segment.getIdentifier(), segment);
}
catch (StorageAdapterLoadingException e) {
catch (SegmentLoadingException e) {
log.error(e, "Failed to load segment[%s]", segment);
emitter.emit(
new AlertEvent.Builder().build(

View File

@ -39,7 +39,7 @@ import com.metamx.druid.coordination.ZkCoordinatorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.QueryableLoaderConfig;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.metrics.ServerMonitor;
import com.metamx.druid.query.MetricsEmittingExecutorService;
@ -172,7 +172,7 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
);
setSegmentLoader(
ServerInit.makeDefaultQueryableLoader(s3Client, getConfigFactory().build(QueryableLoaderConfig.class))
ServerInit.makeDefaultQueryableLoader(s3Client, getConfigFactory().build(SegmentLoaderConfig.class))
);
}
catch (S3ServiceException e) {

View File

@ -26,16 +26,16 @@ import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.DruidProcessingConfig;
import com.metamx.druid.loading.DelegatingSegmentLoader;
import com.metamx.druid.loading.LocalDataSegmentPuller;
import com.metamx.druid.loading.MMappedQueryableIndexFactory;
import com.metamx.druid.loading.QueryableIndexFactory;
import com.metamx.druid.loading.S3SegmentPuller;
import com.metamx.druid.loading.S3DataSegmentPuller;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.loading.SingleSegmentLoader;
import com.metamx.druid.query.group.GroupByQueryEngine;
import com.metamx.druid.query.group.GroupByQueryEngineConfig;
import com.metamx.druid.Query;
import com.metamx.druid.collect.StupidPool;
import com.metamx.druid.loading.QueryableLoaderConfig;
import com.metamx.druid.loading.S3ZippedSegmentPuller;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.group.GroupByQuery;
@ -64,24 +64,22 @@ public class ServerInit
public static SegmentLoader makeDefaultQueryableLoader(
RestS3Service s3Client,
QueryableLoaderConfig config
SegmentLoaderConfig config
)
{
DelegatingSegmentLoader delegateLoader = new DelegatingSegmentLoader();
final S3SegmentPuller segmentGetter = new S3SegmentPuller(s3Client, config);
final S3ZippedSegmentPuller zippedGetter = new S3ZippedSegmentPuller(s3Client, config);
final QueryableIndexFactory factory;
if ("mmap".equals(config.getQueryableFactoryType())) {
factory = new MMappedQueryableIndexFactory();
} else {
throw new ISE("Unknown queryableFactoryType[%s]", config.getQueryableFactoryType());
}
final S3DataSegmentPuller segmentGetter = new S3DataSegmentPuller(s3Client);
final QueryableIndexFactory factory = new MMappedQueryableIndexFactory();
SingleSegmentLoader s3segmentLoader = new SingleSegmentLoader(segmentGetter, factory, config);
SingleSegmentLoader localSegmentLoader = new SingleSegmentLoader(new LocalDataSegmentPuller(), factory, config);
delegateLoader.setLoaderTypes(
ImmutableMap.<String, SegmentLoader>builder()
.put("s3", new SingleSegmentLoader(segmentGetter, factory))
.put("s3_zip", new SingleSegmentLoader(zippedGetter, factory))
.put("s3", s3segmentLoader)
.put("s3_zip", s3segmentLoader)
.put("local", localSegmentLoader)
.build()
);

View File

@ -0,0 +1,47 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading;
import com.metamx.druid.client.DataSegment;
import java.io.File;
/**
*/
public interface DataSegmentPuller
{
/**
* Pull down segment files for the given DataSegment and put them in the given directory.
*
* @param segment The segment to pull down files for
* @param dir The directory to store the files in
* @throws SegmentLoadingException if there are any errors
*/
public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException;
/**
* Returns the last modified time of the given segment.
*
* @param segment The segment to check the last modified time for
* @return the last modified time in millis from the epoch
* @throws SegmentLoadingException if there are any errors
*/
public long getLastModified(DataSegment segment) throws SegmentLoadingException;
}

View File

@ -24,7 +24,7 @@ import com.metamx.druid.client.DataSegment;
import java.io.File;
import java.io.IOException;
public interface SegmentPusher
public interface DataSegmentPusher
{
public DataSegment push(File file, DataSegment segment) throws IOException;
}

View File

@ -19,15 +19,26 @@
package com.metamx.druid.loading;
import com.google.common.base.Joiner;
import com.metamx.druid.client.DataSegment;
import java.io.File;
import java.util.Map;
/**
*/
public interface SegmentPuller
public class DataSegmentPusherUtil
{
public File getSegmentFiles(DataSegment loadSpec) throws StorageAdapterLoadingException;
public boolean cleanSegmentFiles(DataSegment loadSpec) throws StorageAdapterLoadingException;
private static final Joiner JOINER = Joiner.on("/").skipNulls();
public static String getStorageDir(DataSegment segment)
{
return JOINER.join(
segment.getDataSource(),
String.format(
"%s_%s",
segment.getInterval().getStart(),
segment.getInterval().getEnd()
),
segment.getVersion(),
segment.getShardSpec().getPartitionNum()
);
}
}

View File

@ -44,24 +44,24 @@ public class DelegatingSegmentLoader implements SegmentLoader
}
@Override
public Segment getSegment(DataSegment segment) throws StorageAdapterLoadingException
public Segment getSegment(DataSegment segment) throws SegmentLoadingException
{
return getLoader(segment.getLoadSpec()).getSegment(segment);
}
@Override
public void cleanup(DataSegment segment) throws StorageAdapterLoadingException
public void cleanup(DataSegment segment) throws SegmentLoadingException
{
getLoader(segment.getLoadSpec()).cleanup(segment);
}
private SegmentLoader getLoader(Map<String, Object> loadSpec) throws StorageAdapterLoadingException
private SegmentLoader getLoader(Map<String, Object> loadSpec) throws SegmentLoadingException
{
String type = MapUtils.getString(loadSpec, "type");
SegmentLoader loader = loaderTypes.get(type);
if (loader == null) {
throw new StorageAdapterLoadingException("Unknown loader type[%s]. Known types are %s", type, loaderTypes.keySet());
throw new SegmentLoadingException("Unknown loader type[%s]. Known types are %s", type, loaderTypes.keySet());
}
return loader;
}

View File

@ -0,0 +1,105 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading;
import com.google.common.io.Files;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.utils.CompressionUtils;
import java.io.File;
import java.io.IOException;
import java.util.Map;
/**
*/
public class LocalDataSegmentPuller implements DataSegmentPuller
{
private static final Logger log = new Logger(LocalDataSegmentPuller.class);
@Override
public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException
{
final File path = getFile(segment);
if (path.isDirectory()) {
if (path.equals(dir)) {
log.info("Asked to load [%s] into itself, done!", dir);
return;
}
log.info("Copying files from [%s] to [%s]", path, dir);
File file = null;
try {
final File[] files = path.listFiles();
for (int i = 0; i < files.length; ++i) {
file = files[i];
Files.copy(file, new File(dir, file.getName()));
}
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to copy file[%s].", file);
}
} else {
if (!path.getName().endsWith(".zip")) {
throw new SegmentLoadingException("File is not a zip file[%s]", path);
}
log.info("Unzipping local file[%s] to [%s]", path, dir);
try {
CompressionUtils.unzip(path, dir);
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to unzip file[%s]", path);
}
}
}
@Override
public long getLastModified(DataSegment segment) throws SegmentLoadingException
{
final File file = getFile(segment);
long lastModified = Long.MAX_VALUE;
if (file.isDirectory()) {
for (File childFile : file.listFiles()) {
lastModified = Math.min(childFile.lastModified(), lastModified);
}
}
else {
lastModified = file.lastModified();
}
return lastModified;
}
private File getFile(DataSegment segment) throws SegmentLoadingException
{
final Map<String, Object> loadSpec = segment.getLoadSpec();
final File path = new File(MapUtils.getString(loadSpec, "path"));
if (!path.exists()) {
throw new SegmentLoadingException("Asked to load path[%s], but it doesn't exist.", path);
}
return path;
}
}

View File

@ -0,0 +1,96 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.utils.CompressionUtils;
import java.io.File;
import java.io.IOException;
/**
*/
public class LocalDataSegmentPusher implements DataSegmentPusher
{
private static final Logger log = new Logger(LocalDataSegmentPusher.class);
private final LocalDataSegmentPusherConfig config;
private final ObjectMapper jsonMapper;
public LocalDataSegmentPusher(
LocalDataSegmentPusherConfig config,
ObjectMapper jsonMapper
)
{
this.config = config;
this.jsonMapper = jsonMapper;
}
@Override
public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOException
{
File outDir = new File(config.getStorageDirectory(), DataSegmentPusherUtil.getStorageDir(segment));
if (dataSegmentFile.equals(outDir)) {
long size = 0;
for (File file : dataSegmentFile.listFiles()) {
size += file.length();
}
return createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(outDir))
.withSize(size)
.withBinaryVersion(IndexIO.getVersionFromDir(dataSegmentFile)),
outDir
);
}
outDir.mkdirs();
File outFile = new File(outDir, "index.zip");
log.info("Compressing files from[%s] to [%s]", dataSegmentFile, outFile);
long size = CompressionUtils.zip(dataSegmentFile, outFile);
return createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(outFile))
.withSize(size)
.withBinaryVersion(IndexIO.getVersionFromDir(dataSegmentFile)),
outDir
);
}
private DataSegment createDescriptorFile(DataSegment segment, File outDir) throws IOException
{
File descriptorFile = new File(outDir, "descriptor.json");
log.info("Creating descriptor file at[%s]", descriptorFile);
Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile);
return segment;
}
private ImmutableMap<String, Object> makeLoadSpec(File outFile)
{
return ImmutableMap.<String, Object>of("type", "local", "path", outFile.toString());
}
}

View File

@ -25,8 +25,8 @@ import java.io.File;
/**
*/
public abstract class S3SegmentGetterConfig
public abstract class LocalDataSegmentPusherConfig
{
@Config("druid.paths.indexCache")
public abstract File getCacheDirectory();
@Config("druid.pusher.local.storageDirectory")
public abstract File getStorageDirectory();
}

View File

@ -34,7 +34,7 @@ public class MMappedQueryableIndexFactory implements QueryableIndexFactory
private static final Logger log = new Logger(MMappedQueryableIndexFactory.class);
@Override
public QueryableIndex factorize(File parentDir) throws StorageAdapterLoadingException
public QueryableIndex factorize(File parentDir) throws SegmentLoadingException
{
try {
if (! IndexIO.canBeMapped(parentDir)) {
@ -46,11 +46,11 @@ public class MMappedQueryableIndexFactory implements QueryableIndexFactory
IndexIO.storeLatest(IndexIO.readIndex(parentDir), canBeMappedDir);
if (! IndexIO.canBeMapped(canBeMappedDir)) {
throw new StorageAdapterLoadingException("WTF!? newly written file[%s] cannot be mapped!?", canBeMappedDir);
throw new SegmentLoadingException("WTF!? newly written file[%s] cannot be mapped!?", canBeMappedDir);
}
for (File file : canBeMappedDir.listFiles()) {
if (! file.renameTo(new File(parentDir, file.getName()))) {
throw new StorageAdapterLoadingException("Couldn't rename[%s] to [%s]", canBeMappedDir, parentDir);
throw new SegmentLoadingException("Couldn't rename[%s] to [%s]", canBeMappedDir, parentDir);
}
}
FileUtils.deleteDirectory(canBeMappedDir);
@ -66,7 +66,7 @@ public class MMappedQueryableIndexFactory implements QueryableIndexFactory
catch (IOException e2) {
log.error(e, "Problem deleting parentDir[%s]", parentDir);
}
throw new StorageAdapterLoadingException(e, e.getMessage());
throw new SegmentLoadingException(e, "%s", e.getMessage());
}
}
}

View File

@ -27,5 +27,5 @@ import java.io.File;
*/
public interface QueryableIndexFactory
{
public QueryableIndex factorize(File parentDir) throws StorageAdapterLoadingException;
public QueryableIndex factorize(File parentDir) throws SegmentLoadingException;
}

View File

@ -0,0 +1,170 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.common.s3.S3Utils;
import com.metamx.druid.utils.CompressionUtils;
import org.apache.commons.io.FileUtils;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.zip.GZIPInputStream;
/**
*/
public class S3DataSegmentPuller implements DataSegmentPuller
{
private static final Logger log = new Logger(S3DataSegmentPuller.class);
private static final String BUCKET = "bucket";
private static final String KEY = "key";
private final RestS3Service s3Client;
@Inject
public S3DataSegmentPuller(
RestS3Service s3Client
)
{
this.s3Client = s3Client;
}
@Override
public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException
{
S3Coords s3Coords = new S3Coords(segment);
log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir);
if (!isObjectInBucket(s3Coords)) {
throw new SegmentLoadingException("IndexFile[%s] does not exist.", s3Coords);
}
if (!outDir.exists()) {
outDir.mkdirs();
}
if (!outDir.isDirectory()) {
throw new ISE("outDir[%s] must be a directory.", outDir);
}
long startTime = System.currentTimeMillis();
S3Object s3Obj = null;
try {
s3Obj = s3Client.getObject(new S3Bucket(s3Coords.bucket), s3Coords.path);
InputStream in = null;
try {
in = s3Obj.getDataInputStream();
final String key = s3Obj.getKey();
if (key.endsWith(".zip")) {
CompressionUtils.unzip(in, outDir);
} else if (key.endsWith(".gz")) {
final File outFile = new File(outDir, toFilename(key, ".gz"));
ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile));
} else {
ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, ""))));
}
log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime);
}
catch (IOException e) {
FileUtils.deleteDirectory(outDir);
throw new SegmentLoadingException(e, "Problem decompressing object[%s]", s3Obj);
}
finally {
Closeables.closeQuietly(in);
}
}
catch (Exception e) {
throw new SegmentLoadingException(e, e.getMessage());
}
finally {
S3Utils.closeStreamsQuietly(s3Obj);
}
}
private String toFilename(String key, final String suffix)
{
String filename = key.substring(key.lastIndexOf("/") + 1); // characters after last '/'
filename = filename.substring(0, filename.length() - suffix.length()); // remove the suffix from the end
return filename;
}
private boolean isObjectInBucket(S3Coords coords) throws SegmentLoadingException
{
try {
return s3Client.isObjectInBucket(coords.bucket, coords.path);
}
catch (ServiceException e) {
throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
}
}
@Override
public long getLastModified(DataSegment segment) throws SegmentLoadingException
{
S3Coords coords = new S3Coords(segment);
try {
S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(coords.bucket), coords.path);
return objDetails.getLastModifiedDate().getTime();
}
catch (S3ServiceException e) {
throw new SegmentLoadingException(e, e.getMessage());
}
}
private static class S3Coords
{
String bucket;
String path;
public S3Coords(DataSegment segment)
{
Map<String, Object> loadSpec = segment.getLoadSpec();
bucket = MapUtils.getString(loadSpec, BUCKET);
path = MapUtils.getString(loadSpec, KEY);
if (path.startsWith("/")) {
path = path.substring(1);
}
}
public String toString()
{
return String.format("s3://%s/%s", bucket, path);
}
}
}

View File

@ -22,38 +22,34 @@ package com.metamx.druid.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Closeables;
import com.metamx.common.ISE;
import com.metamx.common.StreamUtils;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.utils.CompressionUtils;
import com.metamx.emitter.EmittingLogger;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.acl.gs.GSAccessControlList;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import java.io.*;
import java.io.File;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
public class S3SegmentPusher implements SegmentPusher
public class S3DataSegmentPusher implements DataSegmentPusher
{
private static final EmittingLogger log = new EmittingLogger(S3SegmentPusher.class);
private static final EmittingLogger log = new EmittingLogger(S3DataSegmentPusher.class);
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private final RestS3Service s3Client;
private final S3SegmentPusherConfig config;
private final S3DataSegmentPusherConfig config;
private final ObjectMapper jsonMapper;
public S3SegmentPusher(
RestS3Service s3Client,
S3SegmentPusherConfig config,
ObjectMapper jsonMapper
public S3DataSegmentPusher(
RestS3Service s3Client,
S3DataSegmentPusherConfig config,
ObjectMapper jsonMapper
)
{
this.s3Client = s3Client;
@ -67,35 +63,11 @@ public class S3SegmentPusher implements SegmentPusher
log.info("Uploading [%s] to S3", indexFilesDir);
String outputKey = JOINER.join(
config.getBaseKey().isEmpty() ? null : config.getBaseKey(),
segment.getDataSource(),
String.format(
"%s_%s",
segment.getInterval().getStart(),
segment.getInterval().getEnd()
),
segment.getVersion(),
segment.getShardSpec().getPartitionNum()
DataSegmentPusherUtil.getStorageDir(segment)
);
long indexSize = 0;
final File zipOutFile = File.createTempFile("druid", "index.zip");
ZipOutputStream zipOut = null;
try {
zipOut = new ZipOutputStream(new FileOutputStream(zipOutFile));
File[] indexFiles = indexFilesDir.listFiles();
for (File indexFile : indexFiles) {
log.info("Adding indexFile[%s] with size[%,d]. Total size[%,d]", indexFile, indexFile.length(), indexSize);
if (indexFile.length() >= Integer.MAX_VALUE) {
throw new ISE("indexFile[%s] too large [%,d]", indexFile, indexFile.length());
}
zipOut.putNextEntry(new ZipEntry(indexFile.getName()));
IOUtils.copy(new FileInputStream(indexFile), zipOut);
indexSize += indexFile.length();
}
}
finally {
Closeables.closeQuietly(zipOut);
}
long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
try {
S3Object toPush = new S3Object(zipOutFile);
@ -119,7 +91,7 @@ public class S3SegmentPusher implements SegmentPusher
.withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir));
File descriptorFile = File.createTempFile("druid", "descriptor.json");
StreamUtils.copyToFileAndClose(new ByteArrayInputStream(jsonMapper.writeValueAsBytes(segment)), descriptorFile);
Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile);
S3Object descriptorObject = new S3Object(descriptorFile);
descriptorObject.setBucketName(outputBucket);
descriptorObject.setKey(outputKey + "/descriptor.json");
@ -128,9 +100,6 @@ public class S3SegmentPusher implements SegmentPusher
log.info("Pushing %s", descriptorObject);
s3Client.putObject(outputBucket, descriptorObject);
log.info("Deleting Index File[%s]", indexFilesDir);
FileUtils.deleteDirectory(indexFilesDir);
log.info("Deleting zipped index File[%s]", zipOutFile);
zipOutFile.delete();

View File

@ -24,7 +24,7 @@ import org.skife.config.Default;
/**
*/
public abstract class S3SegmentPusherConfig
public abstract class S3DataSegmentPusherConfig
{
@Config("druid.pusher.s3.bucket")
public abstract String getBucket();

View File

@ -1,180 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import com.metamx.common.StreamUtils;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.common.s3.S3Utils;
import org.apache.commons.io.FileUtils;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
import org.joda.time.DateTime;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Map;
import java.util.zip.GZIPInputStream;
/**
*/
public class S3SegmentPuller implements SegmentPuller
{
private static final Logger log = new Logger(S3SegmentPuller.class);
private static final long DEFAULT_TIMEOUT = 5 * 60 * 1000;
private static final String BUCKET = "bucket";
private static final String KEY = "key";
private final RestS3Service s3Client;
private final S3SegmentGetterConfig config;
@Inject
public S3SegmentPuller(
RestS3Service s3Client,
S3SegmentGetterConfig config
)
{
this.s3Client = s3Client;
this.config = config;
}
@Override
public File getSegmentFiles(DataSegment segment) throws StorageAdapterLoadingException
{
Map<String, Object> loadSpec = segment.getLoadSpec();
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
String s3Path = MapUtils.getString(loadSpec, "key");
log.info("Loading index at path[s3://%s/%s]", s3Bucket, s3Path);
S3Object s3Obj = null;
File tmpFile = null;
try {
if (!s3Client.isObjectInBucket(s3Bucket, s3Path)) {
throw new StorageAdapterLoadingException("IndexFile[s3://%s/%s] does not exist.", s3Bucket, s3Path);
}
File cacheFile = new File(config.getCacheDirectory(), computeCacheFilePath(s3Bucket, s3Path));
if (cacheFile.exists()) {
S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(s3Bucket), s3Path);
DateTime cacheFileLastModified = new DateTime(cacheFile.lastModified());
DateTime s3ObjLastModified = new DateTime(objDetails.getLastModifiedDate().getTime());
if (cacheFileLastModified.isAfter(s3ObjLastModified)) {
log.info(
"Found cacheFile[%s] with modified[%s], which is after s3Obj[%s]. Using.",
cacheFile,
cacheFileLastModified,
s3ObjLastModified
);
return cacheFile.getParentFile();
}
FileUtils.deleteDirectory(cacheFile.getParentFile());
}
long currTime = System.currentTimeMillis();
tmpFile = File.createTempFile(s3Bucket, new DateTime().toString());
log.info(
"Downloading file[s3://%s/%s] to local tmpFile[%s] for cacheFile[%s]",
s3Bucket, s3Path, tmpFile, cacheFile
);
s3Obj = s3Client.getObject(new S3Bucket(s3Bucket), s3Path);
StreamUtils.copyToFileAndClose(s3Obj.getDataInputStream(), tmpFile, DEFAULT_TIMEOUT);
final long downloadEndTime = System.currentTimeMillis();
log.info("Download of file[%s] completed in %,d millis", cacheFile, downloadEndTime - currTime);
if (!cacheFile.getParentFile().mkdirs()) {
log.info("Unable to make parent file[%s]", cacheFile.getParentFile());
}
cacheFile.delete();
if (s3Path.endsWith("gz")) {
log.info("Decompressing file[%s] to [%s]", tmpFile, cacheFile);
StreamUtils.copyToFileAndClose(
new GZIPInputStream(new FileInputStream(tmpFile)),
cacheFile
);
if (!tmpFile.delete()) {
log.error("Could not delete tmpFile[%s].", tmpFile);
}
} else {
log.info("Rename tmpFile[%s] to cacheFile[%s]", tmpFile, cacheFile);
if (!tmpFile.renameTo(cacheFile)) {
log.warn("Error renaming tmpFile[%s] to cacheFile[%s]. Copying instead.", tmpFile, cacheFile);
StreamUtils.copyToFileAndClose(new FileInputStream(tmpFile), cacheFile);
if (!tmpFile.delete()) {
log.error("Could not delete tmpFile[%s].", tmpFile);
}
}
}
long endTime = System.currentTimeMillis();
log.info("Local processing of file[%s] done in %,d millis", cacheFile, endTime - downloadEndTime);
return cacheFile.getParentFile();
}
catch (Exception e) {
throw new StorageAdapterLoadingException(e, e.getMessage());
}
finally {
S3Utils.closeStreamsQuietly(s3Obj);
if (tmpFile != null && tmpFile.exists()) {
log.warn("Deleting tmpFile[%s] in finally block. Why?", tmpFile);
tmpFile.delete();
}
}
}
private String computeCacheFilePath(String s3Bucket, String s3Path)
{
return String.format(
"%s/%s", s3Bucket, s3Path.endsWith("gz") ? s3Path.substring(0, s3Path.length() - ".gz".length()) : s3Path
);
}
@Override
public boolean cleanSegmentFiles(DataSegment segment) throws StorageAdapterLoadingException
{
Map<String, Object> loadSpec = segment.getLoadSpec();
File cacheFile = new File(
config.getCacheDirectory(),
computeCacheFilePath(MapUtils.getString(loadSpec, BUCKET), MapUtils.getString(loadSpec, KEY))
);
try {
final File parentFile = cacheFile.getParentFile();
log.info("Recursively deleting file[%s]", parentFile);
FileUtils.deleteDirectory(parentFile);
}
catch (IOException e) {
throw new StorageAdapterLoadingException(e, e.getMessage());
}
return true;
}
}

View File

@ -1,187 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading;
import com.google.common.io.Closeables;
import com.metamx.common.MapUtils;
import com.metamx.common.StreamUtils;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.common.s3.S3Utils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
import org.joda.time.DateTime;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
/**
*/
public class S3ZippedSegmentPuller implements SegmentPuller
{
private static final Logger log = new Logger(S3ZippedSegmentPuller.class);
private static final String BUCKET = "bucket";
private static final String KEY = "key";
private final RestS3Service s3Client;
private final S3SegmentGetterConfig config;
public S3ZippedSegmentPuller(
RestS3Service s3Client,
S3SegmentGetterConfig config
)
{
this.s3Client = s3Client;
this.config = config;
}
@Override
public File getSegmentFiles(DataSegment segment) throws StorageAdapterLoadingException
{
Map<String, Object> loadSpec = segment.getLoadSpec();
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
String s3Path = MapUtils.getString(loadSpec, "key");
if (s3Path.startsWith("/")) {
s3Path = s3Path.substring(1);
}
log.info("Loading index at path[s3://%s/%s]", s3Bucket, s3Path);
S3Object s3Obj = null;
File tmpFile = null;
try {
if (!s3Client.isObjectInBucket(s3Bucket, s3Path)) {
throw new StorageAdapterLoadingException("IndexFile[s3://%s/%s] does not exist.", s3Bucket, s3Path);
}
File cacheFile = new File(config.getCacheDirectory(), computeCacheFilePath(s3Bucket, s3Path));
if (cacheFile.exists()) {
S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(s3Bucket), s3Path);
DateTime cacheFileLastModified = new DateTime(cacheFile.lastModified());
DateTime s3ObjLastModified = new DateTime(objDetails.getLastModifiedDate().getTime());
if (cacheFileLastModified.isAfter(s3ObjLastModified)) {
log.info(
"Found cacheFile[%s] with modified[%s], which is after s3Obj[%s]. Using.",
cacheFile,
cacheFileLastModified,
s3ObjLastModified
);
return cacheFile;
}
FileUtils.deleteDirectory(cacheFile);
}
long currTime = System.currentTimeMillis();
tmpFile = File.createTempFile(s3Bucket, new DateTime().toString());
log.info(
"Downloading file[s3://%s/%s] to local tmpFile[%s] for cacheFile[%s]",
s3Bucket, s3Path, tmpFile, cacheFile
);
s3Obj = s3Client.getObject(new S3Bucket(s3Bucket), s3Path);
StreamUtils.copyToFileAndClose(s3Obj.getDataInputStream(), tmpFile);
final long downloadEndTime = System.currentTimeMillis();
log.info("Download of file[%s] completed in %,d millis", cacheFile, downloadEndTime - currTime);
if (cacheFile.exists()) {
FileUtils.deleteDirectory(cacheFile);
}
cacheFile.mkdirs();
ZipInputStream zipIn = null;
OutputStream out = null;
ZipEntry entry = null;
try {
zipIn = new ZipInputStream(new BufferedInputStream(new FileInputStream(tmpFile)));
while ((entry = zipIn.getNextEntry()) != null) {
out = new FileOutputStream(new File(cacheFile, entry.getName()));
IOUtils.copy(zipIn, out);
zipIn.closeEntry();
Closeables.closeQuietly(out);
out = null;
}
}
finally {
Closeables.closeQuietly(out);
Closeables.closeQuietly(zipIn);
}
long endTime = System.currentTimeMillis();
log.info("Local processing of file[%s] done in %,d millis", cacheFile, endTime - downloadEndTime);
log.info("Deleting tmpFile[%s]", tmpFile);
tmpFile.delete();
return cacheFile;
}
catch (Exception e) {
throw new StorageAdapterLoadingException(e, e.getMessage());
}
finally {
S3Utils.closeStreamsQuietly(s3Obj);
if (tmpFile != null && tmpFile.exists()) {
log.warn("Deleting tmpFile[%s] in finally block. Why?", tmpFile);
tmpFile.delete();
}
}
}
private String computeCacheFilePath(String s3Bucket, String s3Path)
{
return new File(String.format("%s/%s", s3Bucket, s3Path)).getParent();
}
@Override
public boolean cleanSegmentFiles(DataSegment segment) throws StorageAdapterLoadingException
{
Map<String, Object> loadSpec = segment.getLoadSpec();
File cacheFile = new File(
config.getCacheDirectory(),
computeCacheFilePath(
MapUtils.getString(loadSpec, BUCKET),
MapUtils.getString(loadSpec, KEY)
)
);
try {
log.info("Deleting directory[%s]", cacheFile);
FileUtils.deleteDirectory(cacheFile);
}
catch (IOException e) {
throw new StorageAdapterLoadingException(e, e.getMessage());
}
return true;
}
}

View File

@ -26,6 +26,6 @@ import com.metamx.druid.index.Segment;
*/
public interface SegmentLoader
{
public Segment getSegment(DataSegment loadSpec) throws StorageAdapterLoadingException;
public void cleanup(DataSegment loadSpec) throws StorageAdapterLoadingException;
public Segment getSegment(DataSegment loadSpec) throws SegmentLoadingException;
public void cleanup(DataSegment loadSpec) throws SegmentLoadingException;
}

View File

@ -21,13 +21,18 @@ package com.metamx.druid.loading;
import org.skife.config.Config;
import java.io.File;
/**
*/
public abstract class QueryableLoaderConfig extends S3SegmentGetterConfig
public abstract class SegmentLoaderConfig
{
@Config("druid.queryable.factory")
public String getQueryableFactoryType()
@Config({"druid.paths.indexCache", "druid.segmentCache.path"})
public abstract File getCacheDirectory();
@Config("druid.segmentCache.deleteOnRemove")
public boolean deleteOnRemove()
{
return "mmap";
return true;
}
}

View File

@ -21,9 +21,9 @@ package com.metamx.druid.loading;
/**
*/
public class StorageAdapterLoadingException extends Exception
public class SegmentLoadingException extends Exception
{
public StorageAdapterLoadingException(
public SegmentLoadingException(
String formatString,
Object... objs
)
@ -31,7 +31,7 @@ public class StorageAdapterLoadingException extends Exception
super(String.format(formatString, objs));
}
public StorageAdapterLoadingException(
public SegmentLoadingException(
Throwable cause,
String formatString,
Object... objs

View File

@ -19,40 +19,133 @@
package com.metamx.druid.loading;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.StreamUtils;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.QueryableIndexSegment;
import com.metamx.druid.index.Segment;
import org.apache.commons.io.FileUtils;
import java.io.*;
/**
*/
public class SingleSegmentLoader implements SegmentLoader
{
private final SegmentPuller segmentPuller;
private static final Logger log = new Logger(SingleSegmentLoader.class);
private final DataSegmentPuller dataSegmentPuller;
private final QueryableIndexFactory factory;
private final SegmentLoaderConfig config;
private static final Joiner JOINER = Joiner.on("/").skipNulls();
@Inject
public SingleSegmentLoader(
SegmentPuller segmentPuller,
QueryableIndexFactory factory
DataSegmentPuller dataSegmentPuller,
QueryableIndexFactory factory,
SegmentLoaderConfig config
)
{
this.segmentPuller = segmentPuller;
this.dataSegmentPuller = dataSegmentPuller;
this.factory = factory;
this.config = config;
}
@Override
public Segment getSegment(DataSegment segment) throws StorageAdapterLoadingException
public Segment getSegment(DataSegment segment) throws SegmentLoadingException
{
final QueryableIndex index = factory.factorize(segmentPuller.getSegmentFiles(segment));
File segmentFiles = getSegmentFiles(segment);
final QueryableIndex index = factory.factorize(segmentFiles);
return new QueryableIndexSegment(segment.getIdentifier(), index);
}
@Override
public void cleanup(DataSegment segment) throws StorageAdapterLoadingException
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
segmentPuller.cleanSegmentFiles(segment);
File localStorageDir = new File(config.getCacheDirectory(), DataSegmentPusherUtil.getStorageDir(segment));
if (localStorageDir.exists()) {
long localLastModified = localStorageDir.lastModified();
long remoteLastModified = dataSegmentPuller.getLastModified(segment);
if (remoteLastModified > 0 && localLastModified >= remoteLastModified) {
log.info(
"Found localStorageDir[%s] with modified[%s], which is same or after remote[%s]. Using.",
localStorageDir, localLastModified, remoteLastModified
);
return localStorageDir;
}
}
if (localStorageDir.exists()) {
try {
FileUtils.deleteDirectory(localStorageDir);
}
catch (IOException e) {
log.warn(e, "Exception deleting previously existing local dir[%s]", localStorageDir);
}
}
if (!localStorageDir.mkdirs()) {
log.info("Unable to make parent file[%s]", localStorageDir);
}
dataSegmentPuller.getSegmentFiles(segment, localStorageDir);
return localStorageDir;
}
private File getLocalStorageDir(DataSegment segment)
{
String outputKey = JOINER.join(
segment.getDataSource(),
String.format(
"%s_%s",
segment.getInterval().getStart(),
segment.getInterval().getEnd()
),
segment.getVersion(),
segment.getShardSpec().getPartitionNum()
);
return new File(config.getCacheDirectory(), outputKey);
}
private void moveToCache(File pulledFile, File cacheFile) throws SegmentLoadingException
{
log.info("Rename pulledFile[%s] to cacheFile[%s]", pulledFile, cacheFile);
if (!pulledFile.renameTo(cacheFile)) {
log.warn("Error renaming pulledFile[%s] to cacheFile[%s]. Copying instead.", pulledFile, cacheFile);
try {
StreamUtils.copyToFileAndClose(new FileInputStream(pulledFile), cacheFile);
}
catch (IOException e) {
throw new SegmentLoadingException(
e,
"Problem moving pulledFile[%s] to cache[%s]",
pulledFile,
cacheFile
);
}
if (!pulledFile.delete()) {
log.error("Could not delete pulledFile[%s].", pulledFile);
}
}
}
@Override
public void cleanup(DataSegment segment) throws SegmentLoadingException
{
File cacheFile = getLocalStorageDir(segment).getParentFile();
try {
log.info("Deleting directory[%s]", cacheFile);
FileUtils.deleteDirectory(cacheFile);
}
catch (IOException e) {
throw new SegmentLoadingException(e, e.getMessage());
}
}
}

View File

@ -70,7 +70,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
if (holder.getLifetime() <= 0) {
log.makeAlert("[%s]: Balancer move segments queue has a segment stuck", tier)
.addData("segment", holder.getSegment().getIdentifier())
.addData("server", holder.getServer())
.addData("server", holder.getServer().getStringProps())
.emit();
}
}

View File

@ -31,7 +31,6 @@ import com.metamx.common.Pair;
import com.metamx.common.guava.ConcatSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.druid.Capabilities;
import com.metamx.druid.Druids;
import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity;
@ -39,12 +38,9 @@ import com.metamx.druid.StorageAdapter;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.Segment;
import com.metamx.druid.index.brita.Filter;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.SegmentIdAttachedStorageAdapter;
import com.metamx.druid.index.v1.processing.Cursor;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.loading.StorageAdapterLoadingException;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.metrics.NoopServiceEmitter;
import com.metamx.druid.query.CacheStrategy;
import com.metamx.druid.query.ConcatQueryRunner;
@ -54,7 +50,6 @@ import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.query.search.SearchHit;
import com.metamx.druid.query.search.SearchQuery;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.SearchResultValue;
@ -62,7 +57,6 @@ import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
@ -72,7 +66,6 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
@ -101,7 +94,7 @@ public class ServerManagerTest
}
@Override
public void cleanup(DataSegment segment) throws StorageAdapterLoadingException
public void cleanup(DataSegment segment) throws SegmentLoadingException
{
}
@ -245,7 +238,7 @@ public class ServerManagerTest
)
);
}
catch (StorageAdapterLoadingException e) {
catch (SegmentLoadingException e) {
throw new RuntimeException(e);
}
}
@ -267,7 +260,7 @@ public class ServerManagerTest
)
);
}
catch (StorageAdapterLoadingException e) {
catch (SegmentLoadingException e) {
throw new RuntimeException(e);
}
}
@ -392,7 +385,7 @@ public class ServerManagerTest
}
}
public static class NoopQueryToolChest<T, QueryType extends Query<T>> implements QueryToolChest<T, QueryType>
public static class NoopQueryToolChest<T, QueryType extends Query<T>> extends QueryToolChest<T, QueryType>
{
@Override
public QueryRunner<T> mergeResults(QueryRunner<T> runner)
@ -423,23 +416,5 @@ public class ServerManagerTest
{
return new TypeReference<T>(){};
}
@Override
public <Typer> CacheStrategy<T, Typer, QueryType> getCacheStrategy(QueryType query)
{
return null;
}
@Override
public QueryRunner<T> preMergeQueryDecoration(QueryRunner<T> runner)
{
return runner;
}
@Override
public QueryRunner<T> postMergeQueryDecoration(QueryRunner<T> runner)
{
return runner;
}
}
}

View File

@ -19,6 +19,7 @@
package com.metamx.druid.index.v1;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
@ -111,4 +112,51 @@ public class IndexMergerTest
FileUtils.deleteQuietly(mergedDir);
}
}
@Test
public void testPersistEmptyColumn() throws Exception
{
final IncrementalIndex toPersist1 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{});
final IncrementalIndex toPersist2 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{});
final File tmpDir1 = Files.createTempDir();
final File tmpDir2 = Files.createTempDir();
final File tmpDir3 = Files.createTempDir();
try {
toPersist1.add(
new MapBasedInputRow(
1L,
ImmutableList.of("dim1", "dim2"),
ImmutableMap.<String, Object>of("dim1", ImmutableList.of(), "dim2", "foo")
)
);
toPersist2.add(
new MapBasedInputRow(
1L,
ImmutableList.of("dim1", "dim2"),
ImmutableMap.<String, Object>of("dim1", ImmutableList.of(), "dim2", "bar")
)
);
final QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir1));
final QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir2));
final QueryableIndex merged = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3)
);
Assert.assertEquals(1, index1.getTimeColumn().getLength());
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index1.getAvailableDimensions()));
Assert.assertEquals(1, index2.getTimeColumn().getLength());
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index2.getAvailableDimensions()));
Assert.assertEquals(1, merged.getTimeColumn().getLength());
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(merged.getAvailableDimensions()));
} finally {
FileUtils.deleteQuietly(tmpDir1);
FileUtils.deleteQuietly(tmpDir2);
FileUtils.deleteQuietly(tmpDir3);
}
}
}

View File

@ -30,7 +30,7 @@ import org.joda.time.Interval;
public class NoopSegmentLoader implements SegmentLoader
{
@Override
public Segment getSegment(final DataSegment segment) throws StorageAdapterLoadingException
public Segment getSegment(final DataSegment segment) throws SegmentLoadingException
{
return new Segment()
{
@ -61,7 +61,7 @@ public class NoopSegmentLoader implements SegmentLoader
}
@Override
public void cleanup(DataSegment loadSpec) throws StorageAdapterLoadingException
public void cleanup(DataSegment loadSpec) throws SegmentLoadingException
{
}
}