diff --git a/client/pom.xml b/client/pom.xml
index 302778496bc..3af4d9b489a 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.2.10-SNAPSHOT
+ 0.3.4-SNAPSHOT
diff --git a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java
index 0612fb9337c..6d896d8bf0f 100644
--- a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java
+++ b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java
@@ -139,18 +139,24 @@ public class CachingClusteredClient implements QueryRunner
// build set of segments to query
Set> segments = Sets.newLinkedHashSet();
+ List> serversLookup = Lists.newLinkedList();
+
for (Interval interval : rewrittenQuery.getIntervals()) {
- List> serversLookup = timeline.lookup(interval);
+ serversLookup.addAll(timeline.lookup(interval));
+ }
- for (TimelineObjectHolder holder : serversLookup) {
- for (PartitionChunk chunk : holder.getObject()) {
- ServerSelector selector = chunk.getObject();
- final SegmentDescriptor descriptor = new SegmentDescriptor(
- holder.getInterval(), holder.getVersion(), chunk.getChunkNumber()
- );
+ // Let tool chest filter out unneeded segments
+ final List> filteredServersLookup =
+ toolChest.filterSegments(query, serversLookup);
- segments.add(Pair.of(selector, descriptor));
- }
+ for (TimelineObjectHolder holder : filteredServersLookup) {
+ for (PartitionChunk chunk : holder.getObject()) {
+ ServerSelector selector = chunk.getObject();
+ final SegmentDescriptor descriptor = new SegmentDescriptor(
+ holder.getInterval(), holder.getVersion(), chunk.getChunkNumber()
+ );
+
+ segments.add(Pair.of(selector, descriptor));
}
}
diff --git a/client/src/main/java/com/metamx/druid/client/ServerInventoryManager.java b/client/src/main/java/com/metamx/druid/client/ServerInventoryManager.java
index bd32a62791d..891d41aaec9 100644
--- a/client/src/main/java/com/metamx/druid/client/ServerInventoryManager.java
+++ b/client/src/main/java/com/metamx/druid/client/ServerInventoryManager.java
@@ -19,7 +19,6 @@
package com.metamx.druid.client;
-import com.google.common.collect.Maps;
import com.metamx.common.Pair;
import com.metamx.common.logger.Logger;
import com.metamx.phonebook.PhoneBook;
@@ -27,12 +26,13 @@ import com.metamx.phonebook.PhoneBookPeon;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
*/
public class ServerInventoryManager extends InventoryManager
{
- private static final Map removedSegments = Maps.newHashMap();
+ private static final Map removedSegments = new ConcurrentHashMap();
public ServerInventoryManager(
ServerInventoryManagerConfig config,
diff --git a/client/src/main/java/com/metamx/druid/initialization/Initialization.java b/client/src/main/java/com/metamx/druid/initialization/Initialization.java
index 4865208e88e..ba3b42d56e1 100644
--- a/client/src/main/java/com/metamx/druid/initialization/Initialization.java
+++ b/client/src/main/java/com/metamx/druid/initialization/Initialization.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
@@ -31,8 +30,8 @@ import com.metamx.druid.client.ZKPhoneBook;
import com.metamx.druid.http.FileRequestLogger;
import com.metamx.druid.http.RequestLogger;
import com.metamx.druid.utils.PropUtils;
-import com.metamx.druid.zk.StringZkSerializer;
import com.metamx.druid.zk.PropertiesZkSerializer;
+import com.metamx.druid.zk.StringZkSerializer;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.retry.ExponentialBackoffRetry;
@@ -42,7 +41,6 @@ import com.netflix.curator.x.discovery.ServiceInstance;
import com.netflix.curator.x.discovery.ServiceProvider;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
-
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.nio.SelectChannelConnector;
@@ -53,7 +51,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
/**
*/
diff --git a/client/src/main/java/com/metamx/druid/merge/ClientKillQuery.java b/client/src/main/java/com/metamx/druid/merge/ClientKillQuery.java
new file mode 100644
index 00000000000..02e0a7bd141
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/merge/ClientKillQuery.java
@@ -0,0 +1,41 @@
+package com.metamx.druid.merge;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.joda.time.Interval;
+
+/**
+ */
+public class ClientKillQuery
+{
+ private final String dataSource;
+ private final Interval interval;
+
+ @JsonCreator
+ public ClientKillQuery(
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("interval") Interval interval
+ )
+ {
+ this.dataSource = dataSource;
+ this.interval = interval;
+ }
+
+ @JsonProperty
+ public String getType()
+ {
+ return "kill";
+ }
+
+ @JsonProperty
+ public String getDataSource()
+ {
+ return dataSource;
+ }
+
+ @JsonProperty
+ public Interval getInterval()
+ {
+ return interval;
+ }
+}
diff --git a/client/src/main/java/com/metamx/druid/query/QueryToolChest.java b/client/src/main/java/com/metamx/druid/query/QueryToolChest.java
index 854e2b97c9b..e2d227729d3 100644
--- a/client/src/main/java/com/metamx/druid/query/QueryToolChest.java
+++ b/client/src/main/java/com/metamx/druid/query/QueryToolChest.java
@@ -22,18 +22,19 @@ package com.metamx.druid.query;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.metamx.common.guava.Sequence;
+import com.metamx.druid.LogicalSegment;
import com.metamx.druid.Query;
import com.metamx.emitter.service.ServiceMetricEvent;
-
+import java.util.List;
/**
* The broker-side (also used by server in some cases) API for a specific Query type. This API is still undergoing
* evolution and is only semi-stable, so proprietary Query implementations should be ready for the potential
* maintenance burden when upgrading versions.
*/
-public interface QueryToolChest>
+public abstract class QueryToolChest>
{
- public QueryRunner mergeResults(QueryRunner runner);
+ public abstract QueryRunner mergeResults(QueryRunner runner);
/**
* This method doesn't belong here, but it's here for now just to make it work.
@@ -41,11 +42,24 @@ public interface QueryToolChest>
* @param seqOfSequences
* @return
*/
- public Sequence mergeSequences(Sequence> seqOfSequences);
- public ServiceMetricEvent.Builder makeMetricBuilder(QueryType query);
- public Function makeMetricManipulatorFn(QueryType query, MetricManipulationFn fn);
- public TypeReference getResultTypeReference();
- public CacheStrategy getCacheStrategy(QueryType query);
- public QueryRunner preMergeQueryDecoration(QueryRunner runner);
- public QueryRunner postMergeQueryDecoration(QueryRunner runner);
+ public abstract Sequence mergeSequences(Sequence> seqOfSequences);
+ public abstract ServiceMetricEvent.Builder makeMetricBuilder(QueryType query);
+ public abstract Function makeMetricManipulatorFn(QueryType query, MetricManipulationFn fn);
+ public abstract TypeReference getResultTypeReference();
+
+ public CacheStrategy getCacheStrategy(QueryType query) {
+ return null;
+ }
+
+ public QueryRunner preMergeQueryDecoration(QueryRunner runner) {
+ return runner;
+ }
+
+ public QueryRunner postMergeQueryDecoration(QueryRunner runner) {
+ return runner;
+ }
+
+ public List filterSegments(QueryType query, List segments) {
+ return segments;
+ }
}
diff --git a/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java
index e11f2885fd8..51570e30955 100644
--- a/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java
+++ b/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java
@@ -54,7 +54,7 @@ import java.util.Properties;
/**
*/
-public class GroupByQueryQueryToolChest implements QueryToolChest
+public class GroupByQueryQueryToolChest extends QueryToolChest
{
private static final TypeReference TYPE_REFERENCE = new TypeReference(){};
@@ -177,22 +177,4 @@ public class GroupByQueryQueryToolChest implements QueryToolChest getCacheStrategy(GroupByQuery query)
- {
- return null;
- }
-
- @Override
- public QueryRunner preMergeQueryDecoration(QueryRunner runner)
- {
- return runner;
- }
-
- @Override
- public QueryRunner postMergeQueryDecoration(QueryRunner runner)
- {
- return runner;
- }
}
diff --git a/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
index e7df4959b35..6ede9c7f247 100644
--- a/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
+++ b/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
@@ -51,7 +51,7 @@ import java.util.Map;
import java.util.Set;
-public class SegmentMetadataQueryQueryToolChest implements QueryToolChest
+public class SegmentMetadataQueryQueryToolChest extends QueryToolChest
{
private static final TypeReference TYPE_REFERENCE = new TypeReference(){};
private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4};
@@ -220,18 +220,6 @@ public class SegmentMetadataQueryQueryToolChest implements QueryToolChest preMergeQueryDecoration(QueryRunner runner)
- {
- return runner;
- }
-
- @Override
- public QueryRunner postMergeQueryDecoration(QueryRunner runner)
- {
- return runner;
- }
-
private Ordering getOrdering()
{
return new Ordering()
diff --git a/client/src/main/java/com/metamx/druid/query/search/FragmentSearchQuerySpec.java b/client/src/main/java/com/metamx/druid/query/search/FragmentSearchQuerySpec.java
index b813b7febbf..ea6cdbe72f2 100644
--- a/client/src/main/java/com/metamx/druid/query/search/FragmentSearchQuerySpec.java
+++ b/client/src/main/java/com/metamx/druid/query/search/FragmentSearchQuerySpec.java
@@ -75,7 +75,7 @@ public class FragmentSearchQuerySpec implements SearchQuerySpec
public boolean accept(String dimVal)
{
for (String value : values) {
- if (!dimVal.toLowerCase().contains(value)) {
+ if (dimVal == null || !dimVal.toLowerCase().contains(value)) {
return false;
}
}
diff --git a/client/src/main/java/com/metamx/druid/query/search/InsensitiveContainsSearchQuerySpec.java b/client/src/main/java/com/metamx/druid/query/search/InsensitiveContainsSearchQuerySpec.java
index 1d7a836a3ab..f2ffbc9d1c4 100644
--- a/client/src/main/java/com/metamx/druid/query/search/InsensitiveContainsSearchQuerySpec.java
+++ b/client/src/main/java/com/metamx/druid/query/search/InsensitiveContainsSearchQuerySpec.java
@@ -62,6 +62,9 @@ public class InsensitiveContainsSearchQuerySpec implements SearchQuerySpec
@Override
public boolean accept(String dimVal)
{
+ if (dimVal == null) {
+ return false;
+ }
return dimVal.toLowerCase().contains(value);
}
@@ -80,8 +83,8 @@ public class InsensitiveContainsSearchQuerySpec implements SearchQuerySpec
public String toString()
{
return "InsensitiveContainsSearchQuerySpec{" +
- "value=" + value +
- ", sortSpec=" + sortSpec +
+ "value=" + value +
+ ", sortSpec=" + sortSpec +
"}";
}
}
diff --git a/client/src/main/java/com/metamx/druid/query/search/SearchQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/search/SearchQueryQueryToolChest.java
index 3313573adf8..f58f31ebad7 100644
--- a/client/src/main/java/com/metamx/druid/query/search/SearchQueryQueryToolChest.java
+++ b/client/src/main/java/com/metamx/druid/query/search/SearchQueryQueryToolChest.java
@@ -65,7 +65,7 @@ import java.util.Set;
/**
*/
-public class SearchQueryQueryToolChest implements QueryToolChest, SearchQuery>
+public class SearchQueryQueryToolChest extends QueryToolChest, SearchQuery>
{
private static final byte SEARCH_QUERY = 0x2;
@@ -263,12 +263,6 @@ public class SearchQueryQueryToolChest implements QueryToolChest> postMergeQueryDecoration(final QueryRunner> runner)
- {
- return runner;
- }
-
private static class SearchThresholdAdjustingQueryRunner implements QueryRunner>
{
private final QueryRunner> runner;
diff --git a/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
index 7e902115ef8..dcf09526a31 100644
--- a/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
+++ b/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Ordering;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
+import com.metamx.druid.LogicalSegment;
import com.metamx.druid.Query;
import com.metamx.druid.collect.OrderedMergeSequence;
import com.metamx.druid.query.BySegmentSkippingQueryRunner;
@@ -37,7 +38,6 @@ import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.TimeBoundaryResultValue;
import com.metamx.emitter.service.ServiceMetricEvent;
-
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@@ -47,7 +47,7 @@ import java.util.List;
/**
*/
public class TimeBoundaryQueryQueryToolChest
- implements QueryToolChest, TimeBoundaryQuery>
+ extends QueryToolChest, TimeBoundaryQuery>
{
private static final byte TIMEBOUNDARY_QUERY = 0x3;
@@ -58,6 +58,16 @@ public class TimeBoundaryQueryQueryToolChest
{
};
+ @Override
+ public List filterSegments(TimeBoundaryQuery query, List input)
+ {
+ if(input.size() <= 1) {
+ return input;
+ }
+
+ return Lists.newArrayList(input.get(0), input.get(input.size() - 1));
+ }
+
@Override
public QueryRunner> mergeResults(
final QueryRunner> runner
@@ -169,18 +179,6 @@ public class TimeBoundaryQueryQueryToolChest
};
}
- @Override
- public QueryRunner> preMergeQueryDecoration(QueryRunner> runner)
- {
- return runner;
- }
-
- @Override
- public QueryRunner> postMergeQueryDecoration(QueryRunner> runner)
- {
- return runner;
- }
-
public Ordering> getOrdering()
{
return Ordering.natural();
diff --git a/client/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
index 622626dfd57..bef34d9a2da 100644
--- a/client/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
+++ b/client/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
@@ -61,7 +61,7 @@ import java.util.Map;
/**
*/
-public class TimeseriesQueryQueryToolChest implements QueryToolChest, TimeseriesQuery>
+public class TimeseriesQueryQueryToolChest extends QueryToolChest, TimeseriesQuery>
{
private static final byte TIMESERIES_QUERY = 0x0;
@@ -259,12 +259,6 @@ public class TimeseriesQueryQueryToolChest implements QueryToolChest>(runner, Period.months(1));
}
- @Override
- public QueryRunner> postMergeQueryDecoration(QueryRunner> runner)
- {
- return runner;
- }
-
public Ordering> getOrdering()
{
return Ordering.natural();
diff --git a/common/pom.xml b/common/pom.xml
index f2f878a6da1..03483c15caa 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.2.10-SNAPSHOT
+ 0.3.4-SNAPSHOT
@@ -80,8 +80,12 @@
jackson-databind
- com.fasterxml.jackson.datatype
- jackson-datatype-joda
+ com.fasterxml.jackson.datatype
+ jackson-datatype-guava
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-joda
org.jdbi
diff --git a/common/src/main/java/com/metamx/druid/LogicalSegment.java b/common/src/main/java/com/metamx/druid/LogicalSegment.java
new file mode 100644
index 00000000000..5550fb44966
--- /dev/null
+++ b/common/src/main/java/com/metamx/druid/LogicalSegment.java
@@ -0,0 +1,8 @@
+package com.metamx.druid;
+
+import org.joda.time.Interval;
+
+public interface LogicalSegment
+{
+ public Interval getInterval();
+}
diff --git a/common/src/main/java/com/metamx/druid/TimelineObjectHolder.java b/common/src/main/java/com/metamx/druid/TimelineObjectHolder.java
index 403fe8bb2cb..d93e5ed8f6b 100644
--- a/common/src/main/java/com/metamx/druid/TimelineObjectHolder.java
+++ b/common/src/main/java/com/metamx/druid/TimelineObjectHolder.java
@@ -24,7 +24,7 @@ import org.joda.time.Interval;
/**
*/
-public class TimelineObjectHolder
+public class TimelineObjectHolder implements LogicalSegment
{
private final Interval interval;
private final VersionType version;
@@ -41,6 +41,7 @@ public class TimelineObjectHolder
this.object = object;
}
+ @Override
public Interval getInterval()
{
return interval;
diff --git a/common/src/main/java/com/metamx/druid/jackson/DefaultObjectMapper.java b/common/src/main/java/com/metamx/druid/jackson/DefaultObjectMapper.java
index 84514bd6c4c..12079a77959 100644
--- a/common/src/main/java/com/metamx/druid/jackson/DefaultObjectMapper.java
+++ b/common/src/main/java/com/metamx/druid/jackson/DefaultObjectMapper.java
@@ -36,6 +36,7 @@ import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
+import com.fasterxml.jackson.datatype.guava.GuavaModule;
import com.fasterxml.jackson.datatype.joda.JodaModule;
import com.google.common.base.Throwables;
import com.metamx.common.Granularity;
@@ -171,10 +172,11 @@ public class DefaultObjectMapper extends ObjectMapper
}
);
registerModule(serializerModule);
+ registerModule(new GuavaModule());
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
configure(MapperFeature.AUTO_DETECT_GETTERS, false);
- configure(MapperFeature.AUTO_DETECT_CREATORS, false);
+// configure(MapperFeature.AUTO_DETECT_CREATORS, false); https://github.com/FasterXML/jackson-databind/issues/170
configure(MapperFeature.AUTO_DETECT_FIELDS, false);
configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false);
configure(MapperFeature.AUTO_DETECT_SETTERS, false);
diff --git a/common/src/main/java/com/metamx/druid/utils/CompressionUtils.java b/common/src/main/java/com/metamx/druid/utils/CompressionUtils.java
new file mode 100644
index 00000000000..c34b8e7e960
--- /dev/null
+++ b/common/src/main/java/com/metamx/druid/utils/CompressionUtils.java
@@ -0,0 +1,126 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.utils;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import com.metamx.common.ISE;
+import com.metamx.common.StreamUtils;
+import com.metamx.common.logger.Logger;
+import sun.misc.IOUtils;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
+
+/**
+ */
+public class CompressionUtils
+{
+ private static final Logger log = new Logger(CompressionUtils.class);
+
+ public static long zip(File directory, File outputZipFile) throws IOException
+ {
+ if (!directory.isDirectory()) {
+ throw new IOException(String.format("directory[%s] is not a directory", directory));
+ }
+
+ if (!outputZipFile.getName().endsWith(".zip")) {
+ log.warn("No .zip suffix[%s], putting files from [%s] into it anyway.", outputZipFile, directory);
+ }
+
+ long totalSize = 0;
+ ZipOutputStream zipOut = null;
+ try {
+ zipOut = new ZipOutputStream(new FileOutputStream(outputZipFile));
+ File[] files = directory.listFiles();
+ for (File file : files) {
+ log.info("Adding file[%s] with size[%,d]. Total size[%,d]", file, file.length(), totalSize);
+ if (file.length() >= Integer.MAX_VALUE) {
+ zipOut.close();
+ outputZipFile.delete();
+ throw new IOException(String.format("file[%s] too large [%,d]", file, file.length()));
+ }
+ zipOut.putNextEntry(new ZipEntry(file.getName()));
+ totalSize += ByteStreams.copy(Files.newInputStreamSupplier(file), zipOut);
+ }
+ }
+ finally {
+ Closeables.closeQuietly(zipOut);
+ }
+
+ return totalSize;
+ }
+
+ public static void unzip(File pulledFile, File outDir) throws IOException
+ {
+ if (!(outDir.exists() && outDir.isDirectory())) {
+ throw new ISE("outDir[%s] must exist and be a directory", outDir);
+ }
+
+ log.info("Unzipping file[%s] to [%s]", pulledFile, outDir);
+ InputStream in = null;
+ try {
+ in = new BufferedInputStream(new FileInputStream(pulledFile));
+ unzip(in, outDir);
+ }
+ finally {
+ Closeables.closeQuietly(in);
+ }
+ }
+
+ public static void unzip(InputStream in, File outDir) throws IOException
+ {
+ ZipInputStream zipIn = new ZipInputStream(in);
+
+ ZipEntry entry;
+ while ((entry = zipIn.getNextEntry()) != null) {
+ OutputStream out = null;
+ try {
+ out = new FileOutputStream(new File(outDir, entry.getName()));
+ ByteStreams.copy(zipIn, out);
+ zipIn.closeEntry();
+ }
+ finally {
+ Closeables.closeQuietly(out);
+ }
+ }
+ }
+
+ public static void gunzip(File pulledFile, File outDir) throws IOException
+ {
+ log.info("Gunzipping file[%s] to [%s]", pulledFile, outDir);
+ StreamUtils.copyToFileAndClose(new GZIPInputStream(new FileInputStream(pulledFile)), outDir);
+ if (!pulledFile.delete()) {
+ log.error("Could not delete tmpFile[%s].", pulledFile);
+ }
+ }
+
+}
diff --git a/common/src/main/java/com/metamx/druid/utils/JodaUtils.java b/common/src/main/java/com/metamx/druid/utils/JodaUtils.java
index 53b14618001..c7feeadc44c 100644
--- a/common/src/main/java/com/metamx/druid/utils/JodaUtils.java
+++ b/common/src/main/java/com/metamx/druid/utils/JodaUtils.java
@@ -19,12 +19,15 @@
package com.metamx.druid.utils;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.guava.Comparators;
import org.joda.time.DateTime;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.TreeSet;
@@ -63,6 +66,17 @@ public class JodaUtils
return retVal;
}
+ public static boolean overlaps(final Interval i, Iterable intervals) {
+ return Iterables.any(intervals, new Predicate()
+ {
+ @Override
+ public boolean apply(@Nullable Interval input)
+ {
+ return input.overlaps(i);
+ }
+ });
+ }
+
public static DateTime minDateTime(DateTime... times)
{
if (times == null) {
diff --git a/druid-services/pom.xml b/druid-services/pom.xml
index 90716448ca1..b5b2a7e460c 100644
--- a/druid-services/pom.xml
+++ b/druid-services/pom.xml
@@ -24,11 +24,11 @@
druid-services
druid-services
druid-services
- 0.2.10-SNAPSHOT
+ 0.3.4-SNAPSHOT
com.metamx
druid
- 0.2.10-SNAPSHOT
+ 0.3.4-SNAPSHOT
@@ -66,4 +66,4 @@
-
\ No newline at end of file
+
diff --git a/examples/pom.xml b/examples/pom.xml
index bfbdeb2f735..ae597675a62 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.2.10-SNAPSHOT
+ 0.3.4-SNAPSHOT
diff --git a/examples/rand/pom.xml b/examples/rand/pom.xml
index 20bf45f8cae..b9ff5296fde 100644
--- a/examples/rand/pom.xml
+++ b/examples/rand/pom.xml
@@ -9,7 +9,7 @@
com.metamx
druid-examples
- 0.2.10-SNAPSHOT
+ 0.3.4-SNAPSHOT
diff --git a/examples/rand/src/main/java/druid/examples/RealtimeStandaloneMain.java b/examples/rand/src/main/java/druid/examples/RealtimeStandaloneMain.java
index ecdf2606a3b..92eb86cc801 100644
--- a/examples/rand/src/main/java/druid/examples/RealtimeStandaloneMain.java
+++ b/examples/rand/src/main/java/druid/examples/RealtimeStandaloneMain.java
@@ -12,7 +12,7 @@ import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.realtime.MetadataUpdater;
import com.metamx.druid.realtime.MetadataUpdaterConfig;
import com.metamx.druid.realtime.RealtimeNode;
-import com.metamx.druid.loading.SegmentPusher;
+import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.phonebook.PhoneBook;
@@ -72,8 +72,8 @@ public class RealtimeStandaloneMain
// dummyMetadataUpdater will not send updates to db because standalone demo has no db
rn.setMetadataUpdater(dummyMetadataUpdater);
- rn.setSegmentPusher(
- new SegmentPusher()
+ rn.setDataSegmentPusher(
+ new DataSegmentPusher()
{
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml
index 42be38a8916..88f4beae7da 100644
--- a/examples/twitter/pom.xml
+++ b/examples/twitter/pom.xml
@@ -9,7 +9,7 @@
com.metamx
druid-examples
- 0.2.10-SNAPSHOT
+ 0.3.4-SNAPSHOT
diff --git a/examples/twitter/src/main/java/druid/examples/RealtimeStandaloneMain.java b/examples/twitter/src/main/java/druid/examples/RealtimeStandaloneMain.java
index e936d481489..5f4d25cb95b 100644
--- a/examples/twitter/src/main/java/druid/examples/RealtimeStandaloneMain.java
+++ b/examples/twitter/src/main/java/druid/examples/RealtimeStandaloneMain.java
@@ -12,7 +12,7 @@ import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.realtime.MetadataUpdater;
import com.metamx.druid.realtime.MetadataUpdaterConfig;
import com.metamx.druid.realtime.RealtimeNode;
-import com.metamx.druid.loading.SegmentPusher;
+import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.phonebook.PhoneBook;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
@@ -48,10 +48,12 @@ public class RealtimeStandaloneMain
rn.setPhoneBook(dummyPhoneBook);
MetadataUpdater dummyMetadataUpdater =
- new MetadataUpdater(new DefaultObjectMapper(),
+ new MetadataUpdater(
+ new DefaultObjectMapper(),
Config.createFactory(Initialization.loadProperties()).build(MetadataUpdaterConfig.class),
dummyPhoneBook,
- null) {
+ null
+ ) {
@Override
public void publishSegment(DataSegment segment) throws IOException
{
@@ -74,8 +76,8 @@ public class RealtimeStandaloneMain
// dummyMetadataUpdater will not send updates to db because standalone demo has no db
rn.setMetadataUpdater(dummyMetadataUpdater);
- rn.setSegmentPusher(
- new SegmentPusher()
+ rn.setDataSegmentPusher(
+ new DataSegmentPusher()
{
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
diff --git a/examples/twitter/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java b/examples/twitter/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java
index 1bef29dfc6c..779ba107dab 100644
--- a/examples/twitter/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java
+++ b/examples/twitter/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java
@@ -62,7 +62,7 @@ import static java.lang.Thread.sleep;
* Notes on JSON parsing: as of twitter4j 2.2.x, the json parser has some bugs (ex: Status.toString()
* can have number format exceptions), so it might be necessary to extract raw json and process it
* separately. If so, set twitter4.jsonStoreEnabled=true and look at DataObjectFactory#getRawJSON();
- * org.codehaus.jackson.map.ObjectMapper should be used to parse.
+ * com.fasterxml.jackson.databind.ObjectMapper should be used to parse.
* @author pbaclace
*/
@JsonTypeName("twitzer")
diff --git a/index-common/pom.xml b/index-common/pom.xml
index 49db8c7b958..34264feb745 100644
--- a/index-common/pom.xml
+++ b/index-common/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.2.10-SNAPSHOT
+ 0.3.4-SNAPSHOT
@@ -94,4 +94,4 @@
-
\ No newline at end of file
+
diff --git a/index-common/src/main/java/com/metamx/druid/index/column/SimpleColumn.java b/index-common/src/main/java/com/metamx/druid/index/column/SimpleColumn.java
index 93825a8e9f0..8179cd623e4 100644
--- a/index-common/src/main/java/com/metamx/druid/index/column/SimpleColumn.java
+++ b/index-common/src/main/java/com/metamx/druid/index/column/SimpleColumn.java
@@ -72,30 +72,30 @@ class SimpleColumn implements Column
@Override
public DictionaryEncodedColumn getDictionaryEncoding()
{
- return dictionaryEncodedColumn.get();
+ return dictionaryEncodedColumn == null ? null : dictionaryEncodedColumn.get();
}
@Override
public RunLengthColumn getRunLengthColumn()
{
- return runLengthColumn.get();
+ return runLengthColumn == null ? null : runLengthColumn.get();
}
@Override
public GenericColumn getGenericColumn()
{
- return genericColumn.get();
+ return genericColumn == null ? null : genericColumn.get();
}
@Override
public ComplexColumn getComplexColumn()
{
- return complexColumn.get();
+ return complexColumn == null ? null : complexColumn.get();
}
@Override
public BitmapIndex getBitmapIndex()
{
- return bitmapIndex.get();
+ return bitmapIndex == null ? null : bitmapIndex.get();
}
}
diff --git a/indexer/pom.xml b/indexer/pom.xml
index ee05ae99737..778f682a0f2 100644
--- a/indexer/pom.xml
+++ b/indexer/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.2.10-SNAPSHOT
+ 0.3.4-SNAPSHOT
@@ -57,14 +57,6 @@
javax.mail
mail
-
- org.codehaus.jackson
- jackson-core-asl
-
-
- org.codehaus.jackson
- jackson-mapper-asl
-
diff --git a/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java b/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java
index d4ee1941396..f34ff2988f2 100644
--- a/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java
+++ b/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java
@@ -266,8 +266,7 @@ public class DeterminePartitionsJob implements Jobby
Context context
) throws IOException, InterruptedException
{
- // Create group key
- // TODO -- There are more efficient ways to do this
+ // Create group key, there are probably more efficient ways of doing this
final Map> dims = Maps.newTreeMap();
for(final String dim : inputRow.getDimensions()) {
final Set dimValues = ImmutableSortedSet.copyOf(inputRow.getDimension(dim));
@@ -394,6 +393,9 @@ public class DeterminePartitionsJob implements Jobby
final Interval interval = maybeInterval.get();
final byte[] groupKey = interval.getStart().toString().getBytes(Charsets.UTF_8);
+ // Emit row-counter value.
+ write(context, groupKey, new DimValueCount("", "", 1));
+
for(final Map.Entry> dimAndValues : dims.entrySet()) {
final String dim = dimAndValues.getKey();
@@ -510,9 +512,23 @@ public class DeterminePartitionsJob implements Jobby
Context context, SortableBytes keyBytes, Iterable combinedIterable
) throws IOException, InterruptedException
{
- PeekingIterator iterator = Iterators.peekingIterator(combinedIterable.iterator());
+ final DateTime bucket = new DateTime(new String(keyBytes.getGroupKey(), Charsets.UTF_8));
+ final PeekingIterator iterator = Iterators.peekingIterator(combinedIterable.iterator());
- // "iterator" will take us over many candidate dimensions
+ log.info(
+ "Determining partitions for interval: %s",
+ config.getGranularitySpec().bucketInterval(bucket).orNull()
+ );
+
+ // First DVC should be the total row count indicator
+ final DimValueCount firstDvc = iterator.next();
+ final int totalRows = firstDvc.numRows;
+
+ if(!firstDvc.dim.equals("") || !firstDvc.value.equals("")) {
+ throw new IllegalStateException("WTF?! Expected total row indicator on first k/v pair!");
+ }
+
+ // "iterator" will now take us over many candidate dimensions
DimPartitions currentDimPartitions = null;
DimPartition currentDimPartition = null;
String currentDimPartitionStart = null;
@@ -636,8 +652,6 @@ public class DeterminePartitionsJob implements Jobby
throw new ISE("No suitable partitioning dimension found!");
}
- final int totalRows = dimPartitionss.values().iterator().next().getRows();
-
int maxCardinality = Integer.MIN_VALUE;
long minVariance = Long.MAX_VALUE;
DimPartitions minVariancePartitions = null;
@@ -645,12 +659,14 @@ public class DeterminePartitionsJob implements Jobby
for(final DimPartitions dimPartitions : dimPartitionss.values()) {
if(dimPartitions.getRows() != totalRows) {
- throw new ISE(
- "WTF?! Dimension[%s] row count %,d != expected row count %,d",
+ log.info(
+ "Dimension[%s] is not present in all rows (row count %,d != expected row count %,d)",
dimPartitions.dim,
dimPartitions.getRows(),
totalRows
);
+
+ continue;
}
// Make sure none of these shards are oversized
@@ -684,7 +700,6 @@ public class DeterminePartitionsJob implements Jobby
throw new ISE("No suitable partitioning dimension found!");
}
- final DateTime bucket = new DateTime(new String(keyBytes.getGroupKey(), Charsets.UTF_8));
final OutputStream out = Utils.makePathAndOutputStream(
context, config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0)), config.isOverwriteFiles()
);
diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java
index eed2339114b..979e2d989a4 100644
--- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java
+++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java
@@ -662,8 +662,9 @@ public class HadoopDruidIndexerConfig
return new Path(
String.format(
- "%s/%s_%s/%s/%s",
+ "%s/%s/%s_%s/%s/%s",
getSegmentOutputDir(),
+ dataSource,
bucketInterval.getStart().toString(),
bucketInterval.getEnd().toString(),
getVersion().toString(),
diff --git a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java
index d8eba264c11..0620ba2bc85 100644
--- a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java
+++ b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java
@@ -379,7 +379,8 @@ public class IndexGeneratorJob implements Jobby
);
} else if (outputFS instanceof LocalFileSystem) {
loadSpec = ImmutableMap.of(
- "type", "test"
+ "type", "local",
+ "path", indexOutURI.getPath()
);
} else {
throw new ISE("Unknown file system[%s]", outputFS.getClass());
diff --git a/merger/pom.xml b/merger/pom.xml
index cc230b8e921..e39359574a5 100644
--- a/merger/pom.xml
+++ b/merger/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.2.10-SNAPSHOT
+ 0.3.4-SNAPSHOT
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskCallback.java b/merger/src/main/java/com/metamx/druid/merger/common/TaskCallback.java
similarity index 90%
rename from merger/src/main/java/com/metamx/druid/merger/coordinator/TaskCallback.java
rename to merger/src/main/java/com/metamx/druid/merger/common/TaskCallback.java
index 549d0e7c4a7..edf3c9bae86 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskCallback.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/TaskCallback.java
@@ -17,9 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
-package com.metamx.druid.merger.coordinator;
-
-import com.metamx.druid.merger.common.TaskStatus;
+package com.metamx.druid.merger.common;
public interface TaskCallback
{
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/TaskHolder.java b/merger/src/main/java/com/metamx/druid/merger/common/TaskHolder.java
deleted file mode 100644
index 9abb60d063a..00000000000
--- a/merger/src/main/java/com/metamx/druid/merger/common/TaskHolder.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012 Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package com.metamx.druid.merger.common;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.metamx.druid.merger.common.task.Task;
-import com.metamx.druid.merger.coordinator.TaskContext;
-
-
-
-/**
- */
-public class TaskHolder
-{
- private final Task task;
- private final TaskContext taskContext;
-
- @JsonCreator
- public TaskHolder(
- @JsonProperty("task") Task task,
- @JsonProperty("taskContext") TaskContext taskContext
- )
- {
- this.task = task;
- this.taskContext = taskContext;
- }
-
- @JsonProperty
- public Task getTask()
- {
- return task;
- }
-
- @JsonProperty
- public TaskContext getTaskContext()
- {
- return taskContext;
- }
-}
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskGroup.java b/merger/src/main/java/com/metamx/druid/merger/common/TaskLock.java
similarity index 55%
rename from merger/src/main/java/com/metamx/druid/merger/coordinator/TaskGroup.java
rename to merger/src/main/java/com/metamx/druid/merger/common/TaskLock.java
index 85b2ef62a04..1cb82725482 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskGroup.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/TaskLock.java
@@ -17,40 +17,30 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
-package com.metamx.druid.merger.coordinator;
+package com.metamx.druid.merger.common;
-import com.google.common.base.Function;
import com.google.common.base.Objects;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Ordering;
-import com.metamx.druid.merger.common.task.Task;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Interval;
-import java.util.Set;
-import java.util.TreeSet;
-
/**
- * Represents a transaction as well as the lock it holds. Not immutable: the task set can change.
+ * Represents a lock held by some task. Immutable.
*/
-public class TaskGroup
+public class TaskLock
{
private final String groupId;
private final String dataSource;
private final Interval interval;
private final String version;
- private final Set taskSet = new TreeSet(
- new Ordering()
- {
- @Override
- public int compare(Task task, Task task1)
- {
- return task.getId().compareTo(task1.getId());
- }
- }.nullsFirst()
- );
- public TaskGroup(String groupId, String dataSource, Interval interval, String version)
+ @JsonCreator
+ public TaskLock(
+ @JsonProperty("groupId") String groupId,
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("interval") Interval interval,
+ @JsonProperty("version") String version
+ )
{
this.groupId = groupId;
this.dataSource = dataSource;
@@ -58,29 +48,48 @@ public class TaskGroup
this.version = version;
}
+ @JsonProperty
public String getGroupId()
{
return groupId;
}
+ @JsonProperty
public String getDataSource()
{
return dataSource;
}
+ @JsonProperty
public Interval getInterval()
{
return interval;
}
+ @JsonProperty
public String getVersion()
{
return version;
}
- public Set getTaskSet()
+ @Override
+ public boolean equals(Object o)
{
- return taskSet;
+ if (!(o instanceof TaskLock)) {
+ return false;
+ } else {
+ final TaskLock x = (TaskLock) o;
+ return Objects.equal(this.groupId, x.groupId) &&
+ Objects.equal(this.dataSource, x.dataSource) &&
+ Objects.equal(this.interval, x.interval) &&
+ Objects.equal(this.version, x.version);
+ }
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(groupId, dataSource, interval, version);
}
@Override
@@ -91,21 +100,6 @@ public class TaskGroup
.add("dataSource", dataSource)
.add("interval", interval)
.add("version", version)
- .add(
- "taskSet",
- Lists.newArrayList(
- Iterables.transform(
- taskSet, new Function()
- {
- @Override
- public Object apply(Task task)
- {
- return task.getId();
- }
- }
- )
- )
- )
.toString();
}
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/TaskStatus.java b/merger/src/main/java/com/metamx/druid/merger/common/TaskStatus.java
index d6cb18093b8..ec226429dd0 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/TaskStatus.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/TaskStatus.java
@@ -20,64 +20,42 @@
package com.metamx.druid.merger.common;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.metamx.druid.client.DataSegment;
-import com.metamx.druid.merger.common.task.Task;
-
-
-
-import java.util.Collections;
-import java.util.List;
+/**
+ * Represents the status of a task. The task may be ongoing ({@link #isComplete()} false) or it may be
+ * complete ({@link #isComplete()} true).
+ *
+ * TaskStatus objects are immutable.
+ */
public class TaskStatus
{
public static enum Status
{
RUNNING,
SUCCESS,
- FAILED,
- CONTINUED
+ FAILED
}
public static TaskStatus running(String taskId)
{
- return new TaskStatus(
- taskId,
- Status.RUNNING,
- Collections.emptyList(),
- Collections.emptyList(),
- -1
- );
+ return new TaskStatus(taskId, Status.RUNNING, -1);
+ }
+
+ public static TaskStatus success(String taskId)
+ {
+ return new TaskStatus(taskId, Status.SUCCESS, -1);
}
public static TaskStatus failure(String taskId)
{
- return new TaskStatus(taskId, Status.FAILED, Collections.emptyList(), Collections.emptyList(), -1);
- }
-
- public static TaskStatus success(String taskId, List segments)
- {
- return new TaskStatus(taskId, Status.SUCCESS, ImmutableList.copyOf(segments), Collections.emptyList(), -1);
- }
-
- public static TaskStatus continued(String taskId, List nextTasks)
- {
- Preconditions.checkArgument(nextTasks.size() > 0, "nextTasks.size() > 0");
- return new TaskStatus(
- taskId,
- Status.CONTINUED,
- Collections.emptyList(),
- ImmutableList.copyOf(nextTasks),
- -1
- );
+ return new TaskStatus(taskId, Status.FAILED, -1);
}
private final String id;
- private final List segments;
- private final List nextTasks;
private final Status status;
private final long duration;
@@ -85,16 +63,16 @@ public class TaskStatus
private TaskStatus(
@JsonProperty("id") String id,
@JsonProperty("status") Status status,
- @JsonProperty("segments") List segments,
- @JsonProperty("nextTasks") List nextTasks,
@JsonProperty("duration") long duration
)
{
this.id = id;
- this.segments = segments;
- this.nextTasks = nextTasks;
this.status = status;
this.duration = duration;
+
+ // Check class invariants.
+ Preconditions.checkNotNull(id, "id");
+ Preconditions.checkNotNull(status, "status");
}
@JsonProperty("id")
@@ -109,18 +87,6 @@ public class TaskStatus
return status;
}
- @JsonProperty("segments")
- public List getSegments()
- {
- return segments;
- }
-
- @JsonProperty("nextTasks")
- public List getNextTasks()
- {
- return nextTasks;
- }
-
@JsonProperty("duration")
public long getDuration()
{
@@ -129,43 +95,38 @@ public class TaskStatus
/**
* Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable,
- * isContinued, isSuccess, or isFailure will be true at any one time.
+ * isSuccess, or isFailure will be true at any one time.
*/
+ @JsonIgnore
public boolean isRunnable()
{
return status == Status.RUNNING;
}
- /**
- * Returned by tasks when they complete successfully without spawning subtasks. Exactly one of isRunnable,
- * isContinued, isSuccess, or isFailure will be true at any one time.
- */
- public boolean isContinued()
- {
- return status == Status.CONTINUED;
- }
-
/**
* Inverse of {@link #isRunnable}.
*/
+ @JsonIgnore
public boolean isComplete()
{
return !isRunnable();
}
/**
- * Returned by tasks when they spawn subtasks. Exactly one of isRunnable, isContinued, isSuccess, or isFailure will
+ * Returned by tasks when they spawn subtasks. Exactly one of isRunnable, isSuccess, or isFailure will
* be true at any one time.
*/
+ @JsonIgnore
public boolean isSuccess()
{
return status == Status.SUCCESS;
}
/**
- * Returned by tasks when they complete unsuccessfully. Exactly one of isRunnable, isContinued, isSuccess, or
+ * Returned by tasks when they complete unsuccessfully. Exactly one of isRunnable, isSuccess, or
* isFailure will be true at any one time.
*/
+ @JsonIgnore
public boolean isFailure()
{
return status == Status.FAILED;
@@ -173,7 +134,7 @@ public class TaskStatus
public TaskStatus withDuration(long _duration)
{
- return new TaskStatus(id, status, segments, nextTasks, _duration);
+ return new TaskStatus(id, status, _duration);
}
@Override
@@ -181,8 +142,6 @@ public class TaskStatus
{
return Objects.toStringHelper(this)
.add("id", id)
- .add("segments", segments)
- .add("nextTasks", nextTasks)
.add("status", status)
.add("duration", duration)
.toString();
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java
index 0cebe1fc91c..58ad3c5cc43 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java
@@ -20,19 +20,23 @@
package com.metamx.druid.merger.common;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableMap;
-import com.metamx.druid.loading.S3SegmentPuller;
-import com.metamx.druid.loading.S3SegmentGetterConfig;
-import com.metamx.druid.loading.S3ZippedSegmentPuller;
-import com.metamx.druid.loading.SegmentPuller;
+import com.google.common.collect.Maps;
+import com.metamx.druid.client.DataSegment;
+import com.metamx.druid.loading.DataSegmentPusher;
+import com.metamx.druid.loading.MMappedQueryableIndexFactory;
+import com.metamx.druid.loading.S3DataSegmentPuller;
+import com.metamx.druid.loading.SegmentKiller;
+import com.metamx.druid.loading.SegmentLoaderConfig;
+import com.metamx.druid.loading.SegmentLoadingException;
+import com.metamx.druid.loading.SingleSegmentLoader;
+import com.metamx.druid.merger.common.actions.TaskActionClient;
+import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.task.Task;
-import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
-import com.metamx.druid.loading.SegmentPusher;
import com.metamx.emitter.service.ServiceEmitter;
-
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import java.io.File;
+import java.util.List;
import java.util.Map;
/**
@@ -40,67 +44,84 @@ import java.util.Map;
*/
public class TaskToolbox
{
- private final IndexerCoordinatorConfig config;
+ private final TaskConfig config;
+ private final TaskActionClient taskActionClient;
private final ServiceEmitter emitter;
private final RestS3Service s3Client;
- private final SegmentPusher segmentPusher;
+ private final DataSegmentPusher segmentPusher;
+ private final SegmentKiller segmentKiller;
private final ObjectMapper objectMapper;
public TaskToolbox(
- IndexerCoordinatorConfig config,
+ TaskConfig config,
+ TaskActionClient taskActionClient,
ServiceEmitter emitter,
RestS3Service s3Client,
- SegmentPusher segmentPusher,
+ DataSegmentPusher segmentPusher,
+ SegmentKiller segmentKiller,
ObjectMapper objectMapper
)
{
this.config = config;
+ this.taskActionClient = taskActionClient;
this.emitter = emitter;
this.s3Client = s3Client;
this.segmentPusher = segmentPusher;
+ this.segmentKiller = segmentKiller;
this.objectMapper = objectMapper;
}
- public IndexerCoordinatorConfig getConfig()
+ public TaskConfig getConfig()
{
return config;
}
+ public TaskActionClient getTaskActionClient()
+ {
+ return taskActionClient;
+ }
+
public ServiceEmitter getEmitter()
{
return emitter;
}
- public RestS3Service getS3Client()
- {
- return s3Client;
- }
-
- public SegmentPusher getSegmentPusher()
+ public DataSegmentPusher getSegmentPusher()
{
return segmentPusher;
}
+ public SegmentKiller getSegmentKiller()
+ {
+ return segmentKiller;
+ }
+
public ObjectMapper getObjectMapper()
{
return objectMapper;
}
- public Map getSegmentGetters(final Task task)
+ public Map getSegments(final Task task, List segments)
+ throws SegmentLoadingException
{
- final S3SegmentGetterConfig getterConfig = new S3SegmentGetterConfig()
- {
- @Override
- public File getCacheDirectory()
- {
- return new File(config.getTaskDir(task), "fetched_segments");
- }
- };
+ final SingleSegmentLoader loader = new SingleSegmentLoader(
+ new S3DataSegmentPuller(s3Client),
+ new MMappedQueryableIndexFactory(),
+ new SegmentLoaderConfig()
+ {
+ @Override
+ public File getCacheDirectory()
+ {
+ return new File(config.getTaskDir(task), "fetched_segments");
+ }
+ }
+ );
- return ImmutableMap.builder()
- .put("s3", new S3SegmentPuller(s3Client, getterConfig))
- .put("s3_union", new S3SegmentPuller(s3Client, getterConfig))
- .put("s3_zip", new S3ZippedSegmentPuller(s3Client, getterConfig))
- .build();
+ Map retVal = Maps.newLinkedHashMap();
+ for (DataSegment segment : segments) {
+ retVal.put(segment, loader.getSegmentFiles(segment));
+ }
+
+ return retVal;
}
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/LocalTaskActionClient.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/LocalTaskActionClient.java
new file mode 100644
index 00000000000..f6740064f52
--- /dev/null
+++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/LocalTaskActionClient.java
@@ -0,0 +1,37 @@
+package com.metamx.druid.merger.common.actions;
+
+import com.metamx.druid.merger.coordinator.TaskStorage;
+import com.metamx.emitter.EmittingLogger;
+
+public class LocalTaskActionClient implements TaskActionClient
+{
+ private final TaskStorage storage;
+ private final TaskActionToolbox toolbox;
+
+ private static final EmittingLogger log = new EmittingLogger(LocalTaskActionClient.class);
+
+ public LocalTaskActionClient(TaskStorage storage, TaskActionToolbox toolbox)
+ {
+ this.storage = storage;
+ this.toolbox = toolbox;
+ }
+
+ @Override
+ public RetType submit(TaskAction taskAction)
+ {
+ final RetType ret = taskAction.perform(toolbox);
+
+ // Add audit log
+ try {
+ storage.addAuditLog(taskAction);
+ }
+ catch (Exception e) {
+ log.makeAlert(e, "Failed to record action in audit log")
+ .addData("task", taskAction.getTask().getId())
+ .addData("actionClass", taskAction.getClass().getName())
+ .emit();
+ }
+
+ return ret;
+ }
+}
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/LockAcquireAction.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/LockAcquireAction.java
new file mode 100644
index 00000000000..f669af33625
--- /dev/null
+++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/LockAcquireAction.java
@@ -0,0 +1,53 @@
+package com.metamx.druid.merger.common.actions;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.metamx.druid.merger.common.TaskLock;
+import com.metamx.druid.merger.common.task.Task;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.joda.time.Interval;
+
+public class LockAcquireAction implements TaskAction>
+{
+ private final Task task;
+ private final Interval interval;
+
+ @JsonCreator
+ public LockAcquireAction(
+ @JsonProperty("task") Task task,
+ @JsonProperty("interval") Interval interval
+ )
+ {
+ this.task = task;
+ this.interval = interval;
+ }
+
+ @JsonProperty
+ public Task getTask()
+ {
+ return task;
+ }
+
+ @JsonProperty
+ public Interval getInterval()
+ {
+ return interval;
+ }
+
+ public TypeReference> getReturnTypeReference()
+ {
+ return new TypeReference>() {};
+ }
+
+ @Override
+ public Optional perform(TaskActionToolbox toolbox)
+ {
+ try {
+ return toolbox.getTaskLockbox().tryLock(task, interval);
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/LockListAction.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/LockListAction.java
new file mode 100644
index 00000000000..e0e3eddb71f
--- /dev/null
+++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/LockListAction.java
@@ -0,0 +1,45 @@
+package com.metamx.druid.merger.common.actions;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.metamx.druid.merger.common.TaskLock;
+import com.metamx.druid.merger.common.task.Task;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import java.util.List;
+
+public class LockListAction implements TaskAction>
+{
+ private final Task task;
+
+ @JsonCreator
+ public LockListAction(
+ @JsonProperty("task") Task task
+ )
+ {
+ this.task = task;
+ }
+
+ @JsonProperty
+ public Task getTask()
+ {
+ return task;
+ }
+
+ public TypeReference> getReturnTypeReference()
+ {
+ return new TypeReference>() {};
+ }
+
+ @Override
+ public List perform(TaskActionToolbox toolbox)
+ {
+ try {
+ return toolbox.getTaskLockbox().findLocksForTask(task);
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/LockReleaseAction.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/LockReleaseAction.java
new file mode 100644
index 00000000000..5c84d024a50
--- /dev/null
+++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/LockReleaseAction.java
@@ -0,0 +1,55 @@
+package com.metamx.druid.merger.common.actions;
+
+import com.google.common.base.Throwables;
+import com.metamx.druid.merger.common.TaskLock;
+import com.metamx.druid.merger.common.task.Task;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.joda.time.Interval;
+
+import java.util.List;
+
+public class LockReleaseAction implements TaskAction
+{
+ private final Task task;
+ private final Interval interval;
+
+ @JsonCreator
+ public LockReleaseAction(
+ @JsonProperty("task") Task task,
+ @JsonProperty("interval") Interval interval
+ )
+ {
+ this.task = task;
+ this.interval = interval;
+ }
+
+ @JsonProperty
+ public Task getTask()
+ {
+ return task;
+ }
+
+ @JsonProperty
+ public Interval getInterval()
+ {
+ return interval;
+ }
+
+ public TypeReference getReturnTypeReference()
+ {
+ return new TypeReference() {};
+ }
+
+ @Override
+ public Void perform(TaskActionToolbox toolbox)
+ {
+ try {
+ toolbox.getTaskLockbox().unlock(task, interval);
+ return null;
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/RemoteTaskActionClient.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/RemoteTaskActionClient.java
new file mode 100644
index 00000000000..26900e29942
--- /dev/null
+++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/RemoteTaskActionClient.java
@@ -0,0 +1,75 @@
+package com.metamx.druid.merger.common.actions;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import com.metamx.common.logger.Logger;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.response.ToStringResponseHandler;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.netflix.curator.x.discovery.ServiceInstance;
+import com.netflix.curator.x.discovery.ServiceProvider;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+public class RemoteTaskActionClient implements TaskActionClient
+{
+ private final HttpClient httpClient;
+ private final ServiceProvider serviceProvider;
+ private final ObjectMapper jsonMapper;
+
+ private static final Logger log = new Logger(RemoteTaskActionClient.class);
+
+ public RemoteTaskActionClient(HttpClient httpClient, ServiceProvider serviceProvider, ObjectMapper jsonMapper)
+ {
+ this.httpClient = httpClient;
+ this.serviceProvider = serviceProvider;
+ this.jsonMapper = jsonMapper;
+ }
+
+ @Override
+ public RetType submit(TaskAction taskAction)
+ {
+ try {
+ byte[] dataToSend = jsonMapper.writeValueAsBytes(taskAction);
+
+ final String response = httpClient.post(getServiceUri().toURL())
+ .setContent("application/json", dataToSend)
+ .go(new ToStringResponseHandler(Charsets.UTF_8))
+ .get();
+
+ final Map responseDict = jsonMapper.readValue(
+ response,
+ new TypeReference