Merge branch 'master' into realtime-index-task

Conflicts:
	merger/src/main/java/com/metamx/druid/merger/common/actions/SegmentInsertAction.java
This commit is contained in:
Gian Merlino 2013-03-14 13:45:59 -07:00
commit f6752799bc
7 changed files with 210 additions and 4 deletions

View File

@ -33,7 +33,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonSubTypes.Type(name="selector", value=SelectorDimFilter.class), @JsonSubTypes.Type(name="selector", value=SelectorDimFilter.class),
@JsonSubTypes.Type(name="extraction", value=ExtractionDimFilter.class), @JsonSubTypes.Type(name="extraction", value=ExtractionDimFilter.class),
@JsonSubTypes.Type(name="regex", value=RegexDimFilter.class), @JsonSubTypes.Type(name="regex", value=RegexDimFilter.class),
@JsonSubTypes.Type(name="search", value=SearchQueryDimFilter.class) @JsonSubTypes.Type(name="search", value=SearchQueryDimFilter.class),
@JsonSubTypes.Type(name="javascript", value=JavaScriptDimFilter.class)
}) })
public interface DimFilter public interface DimFilter
{ {

View File

@ -34,6 +34,7 @@ class DimFilterCacheHelper
static final byte EXTRACTION_CACHE_ID = 0x4; static final byte EXTRACTION_CACHE_ID = 0x4;
static final byte REGEX_CACHE_ID = 0x5; static final byte REGEX_CACHE_ID = 0x5;
static final byte SEARCH_QUERY_TYPE_ID = 0x6; static final byte SEARCH_QUERY_TYPE_ID = 0x6;
static final byte JAVASCRIPT_CACHE_ID = 0x7;
static byte[] computeCacheKey(byte cacheIdKey, List<DimFilter> filters) static byte[] computeCacheKey(byte cacheIdKey, List<DimFilter> filters)
{ {

View File

@ -0,0 +1,48 @@
package com.metamx.druid.query.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import java.nio.ByteBuffer;
public class JavaScriptDimFilter implements DimFilter
{
private final String dimension;
private final String function;
@JsonCreator
public JavaScriptDimFilter(
@JsonProperty("dimension") String dimension,
@JsonProperty("function") String function
)
{
this.dimension = dimension;
this.function = function;
}
@JsonProperty
public String getDimension()
{
return dimension;
}
@JsonProperty
public String getFunction()
{
return function;
}
@Override
public byte[] getCacheKey()
{
final byte[] dimensionBytes = dimension.getBytes(Charsets.UTF_8);
final byte[] functionBytes = function.getBytes(Charsets.UTF_8);
return ByteBuffer.allocate(1 + dimensionBytes.length + functionBytes.length)
.put(DimFilterCacheHelper.JAVASCRIPT_CACHE_ID)
.put(dimensionBytes)
.put(functionBytes)
.array();
}
}

View File

@ -1,6 +1,7 @@
package com.metamx.druid.merger.common.actions; package com.metamx.druid.merger.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
@ -14,14 +15,25 @@ import java.util.Set;
public class SegmentInsertAction implements TaskAction<Set<DataSegment>> public class SegmentInsertAction implements TaskAction<Set<DataSegment>>
{ {
@JsonIgnore
private final Set<DataSegment> segments; private final Set<DataSegment> segments;
@JsonIgnore
private final boolean allowOlderVersions;
public SegmentInsertAction(Set<DataSegment> segments)
{
this(segments, false);
}
@JsonCreator @JsonCreator
public SegmentInsertAction( public SegmentInsertAction(
@JsonProperty("segments") Set<DataSegment> segments @JsonProperty("segments") Set<DataSegment> segments,
@JsonProperty("allowOlderVersions") boolean allowOlderVersions
) )
{ {
this.segments = ImmutableSet.copyOf(segments); this.segments = ImmutableSet.copyOf(segments);
this.allowOlderVersions = allowOlderVersions;
} }
@JsonProperty @JsonProperty
@ -30,6 +42,17 @@ public class SegmentInsertAction implements TaskAction<Set<DataSegment>>
return segments; return segments;
} }
@JsonProperty
public boolean isAllowOlderVersions()
{
return allowOlderVersions;
}
public SegmentInsertAction withAllowOlderVersions(boolean _allowOlderVersions)
{
return new SegmentInsertAction(segments, _allowOlderVersions);
}
public TypeReference<Set<DataSegment>> getReturnTypeReference() public TypeReference<Set<DataSegment>> getReturnTypeReference()
{ {
return new TypeReference<Set<DataSegment>>() {}; return new TypeReference<Set<DataSegment>>() {};
@ -38,7 +61,7 @@ public class SegmentInsertAction implements TaskAction<Set<DataSegment>>
@Override @Override
public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException
{ {
if(!toolbox.taskLockCoversSegments(task, segments, false)) { if(!toolbox.taskLockCoversSegments(task, segments, allowOlderVersions)) {
throw new ISE("Segments not covered by locks for task[%s]: %s", task.getId(), segments); throw new ISE("Segments not covered by locks for task[%s]: %s", task.getId(), segments);
} }

View File

@ -236,7 +236,8 @@ public class VersionConverterTask extends AbstractTask
DataSegment updatedSegment = segment.withVersion(String.format("%s_v%s", segment.getVersion(), outVersion)); DataSegment updatedSegment = segment.withVersion(String.format("%s_v%s", segment.getVersion(), outVersion));
updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment); updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment);
toolbox.getTaskActionClient().submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment))); toolbox.getTaskActionClient()
.submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment)).withAllowOlderVersions(true));
} else { } else {
log.info("Conversion failed."); log.info("Conversion failed.");
} }

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
import com.metamx.druid.query.filter.AndDimFilter; import com.metamx.druid.query.filter.AndDimFilter;
import com.metamx.druid.query.filter.DimFilter; import com.metamx.druid.query.filter.DimFilter;
import com.metamx.druid.query.filter.ExtractionDimFilter; import com.metamx.druid.query.filter.ExtractionDimFilter;
import com.metamx.druid.query.filter.JavaScriptDimFilter;
import com.metamx.druid.query.filter.NotDimFilter; import com.metamx.druid.query.filter.NotDimFilter;
import com.metamx.druid.query.filter.OrDimFilter; import com.metamx.druid.query.filter.OrDimFilter;
import com.metamx.druid.query.filter.RegexDimFilter; import com.metamx.druid.query.filter.RegexDimFilter;
@ -84,6 +85,10 @@ public class Filters
final SearchQueryDimFilter searchQueryFilter = (SearchQueryDimFilter) dimFilter; final SearchQueryDimFilter searchQueryFilter = (SearchQueryDimFilter) dimFilter;
filter = new SearchQueryFilter(searchQueryFilter.getDimension(), searchQueryFilter.getQuery()); filter = new SearchQueryFilter(searchQueryFilter.getDimension(), searchQueryFilter.getQuery());
} else if (dimFilter instanceof JavaScriptDimFilter) {
final JavaScriptDimFilter javaScriptDimFilter = (JavaScriptDimFilter) dimFilter;
filter = new JavaScriptFilter(javaScriptDimFilter.getDimension(), javaScriptDimFilter.getFunction());
} }
return filter; return filter;

View File

@ -0,0 +1,127 @@
package com.metamx.druid.index.brita;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.metamx.common.guava.FunctionalIterable;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.mozilla.javascript.Context;
import org.mozilla.javascript.Function;
import org.mozilla.javascript.ScriptableObject;
import javax.annotation.Nullable;
public class JavaScriptFilter implements Filter
{
private final JavaScriptPredicate predicate;
private final String dimension;
public JavaScriptFilter(String dimension, final String script)
{
this.dimension = dimension;
this.predicate = new JavaScriptPredicate(script);
}
@Override
public ImmutableConciseSet goConcise(final BitmapIndexSelector selector)
{
final Context cx = Context.enter();
try {
ImmutableConciseSet conciseSet = ImmutableConciseSet.union(
FunctionalIterable.create(selector.getDimensionValues(dimension))
.filter(new Predicate<String>()
{
@Override
public boolean apply(@Nullable String input)
{
return predicate.applyInContext(cx, input);
}
})
.transform(
new com.google.common.base.Function<String, ImmutableConciseSet>()
{
@Override
public ImmutableConciseSet apply(@Nullable String input)
{
return selector.getConciseInvertedIndex(dimension, input);
}
}
)
);
return conciseSet;
} finally {
Context.exit();
}
}
@Override
public ValueMatcher makeMatcher(ValueMatcherFactory factory)
{
// suboptimal, since we need create one context per call to predicate.apply()
return factory.makeValueMatcher(dimension, predicate);
}
static class JavaScriptPredicate implements Predicate<String> {
final ScriptableObject scope;
final Function fnApply;
final String script;
public JavaScriptPredicate(final String script) {
Preconditions.checkNotNull(script, "script must not be null");
this.script = script;
final Context cx = Context.enter();
try {
cx.setOptimizationLevel(9);
scope = cx.initStandardObjects();
fnApply = cx.compileFunction(scope, script, "script", 1, null);
} finally {
Context.exit();
}
}
@Override
public boolean apply(final String input)
{
// one and only one context per thread
final Context cx = Context.enter();
try {
return applyInContext(cx, input);
} finally {
Context.exit();
}
}
public boolean applyInContext(Context cx, String input)
{
return Context.toBoolean(fnApply.call(cx, scope, scope, new String[]{input}));
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
JavaScriptPredicate that = (JavaScriptPredicate) o;
if (!script.equals(that.script)) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return script.hashCode();
}
}
}