Merge branch 'master' into task-stuff

This commit is contained in:
Gian Merlino 2013-01-25 11:34:18 -08:00
commit 553738e1d8
24 changed files with 1636 additions and 799 deletions

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
@ -41,7 +42,6 @@ import com.metamx.druid.TimelineObjectHolder;
import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.cache.Cache;
import com.metamx.druid.client.cache.CacheBroker;
import com.metamx.druid.client.selector.ServerSelector;
import com.metamx.druid.partition.PartitionChunk;
import com.metamx.druid.query.CacheStrategy;
@ -64,6 +64,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
/**
@ -74,19 +75,19 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
private final QueryToolChestWarehouse warehouse;
private final ServerView serverView;
private final CacheBroker cacheBroker;
private final Cache cache;
private final ObjectMapper objectMapper;
public CachingClusteredClient(
QueryToolChestWarehouse warehouse,
ServerView serverView,
CacheBroker cacheBroker,
Cache cache,
ObjectMapper objectMapper
)
{
this.warehouse = warehouse;
this.serverView = serverView;
this.cacheBroker = cacheBroker;
this.cache = cache;
this.objectMapper = objectMapper;
serverView.registerSegmentCallback(
@ -98,7 +99,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
@Override
public ServerView.CallbackAction segmentRemoved(DruidServer server, DataSegment segment)
{
CachingClusteredClient.this.cacheBroker.provideCache(segment.getIdentifier()).close();
CachingClusteredClient.this.cache.close(segment.getIdentifier());
return ServerView.CallbackAction.CONTINUE;
}
}
@ -111,7 +112,8 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
final CacheStrategy<T, Query<T>> strategy = toolChest.getCacheStrategy(query);
final Map<DruidServer, List<SegmentDescriptor>> segs = Maps.newTreeMap();
final Map<DruidServer, List<SegmentDescriptor>> serverSegments = Maps.newTreeMap();
final List<Pair<DateTime, byte[]>> cachedResults = Lists.newArrayList();
final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
@ -131,10 +133,8 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
return Sequences.empty();
}
byte[] queryCacheKey = null;
if (strategy != null) {
queryCacheKey = strategy.computeCacheKey(query);
}
// build set of segments to query
Set<Pair<ServerSelector, SegmentDescriptor>> segments = Sets.newLinkedHashSet();
for (Interval interval : rewrittenQuery.getIntervals()) {
List<TimelineObjectHolder<String, ServerSelector>> serversLookup = timeline.lookup(interval);
@ -146,55 +146,67 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
holder.getInterval(), holder.getVersion(), chunk.getChunkNumber()
);
if (queryCacheKey == null) {
final DruidServer server = selector.pick();
List<SegmentDescriptor> descriptors = segs.get(server);
if (descriptors == null) {
descriptors = Lists.newArrayList();
segs.put(server, descriptors);
}
descriptors.add(descriptor);
}
else {
final Interval segmentQueryInterval = holder.getInterval();
final byte[] versionBytes = descriptor.getVersion().getBytes();
final byte[] cacheKey = ByteBuffer
.allocate(16 + versionBytes.length + 4 + queryCacheKey.length)
.putLong(segmentQueryInterval.getStartMillis())
.putLong(segmentQueryInterval.getEndMillis())
.put(versionBytes)
.putInt(descriptor.getPartitionNumber())
.put(queryCacheKey)
.array();
final String segmentIdentifier = selector.getSegment().getIdentifier();
final Cache cache = cacheBroker.provideCache(segmentIdentifier);
final byte[] cachedValue = cache.get(cacheKey);
if (useCache && cachedValue != null) {
cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue));
} else {
final DruidServer server = selector.pick();
List<SegmentDescriptor> descriptors = segs.get(server);
if (descriptors == null) {
descriptors = Lists.newArrayList();
segs.put(server, descriptors);
}
descriptors.add(descriptor);
cachePopulatorMap.put(
String.format("%s_%s", segmentIdentifier, segmentQueryInterval),
new CachePopulator(cache, objectMapper, cacheKey)
);
}
}
segments.add(Pair.of(selector, descriptor));
}
}
}
final byte[] queryCacheKey;
if(strategy != null) {
queryCacheKey = strategy.computeCacheKey(query);
} else {
queryCacheKey = null;
}
// Pull cached segments from cache and remove from set of segments to query
if(useCache && queryCacheKey != null) {
Map<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> cacheKeys = Maps.newHashMap();
for(Pair<ServerSelector, SegmentDescriptor> e : segments) {
cacheKeys.put(e, computeSegmentCacheKey(e.lhs.getSegment().getIdentifier(), e.rhs, queryCacheKey));
}
Map<Cache.NamedKey, byte[]> cachedValues = cache.getBulk(cacheKeys.values());
for(Map.Entry<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> entry : cacheKeys.entrySet()) {
Pair<ServerSelector, SegmentDescriptor> segment = entry.getKey();
Cache.NamedKey segmentCacheKey = entry.getValue();
final ServerSelector selector = segment.lhs;
final SegmentDescriptor descriptor = segment.rhs;
final Interval segmentQueryInterval = descriptor.getInterval();
final byte[] cachedValue = cachedValues.get(segmentCacheKey);
if (cachedValue != null) {
cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue));
// remove cached segment from set of segments to query
segments.remove(segment);
}
else {
final String segmentIdentifier = selector.getSegment().getIdentifier();
cachePopulatorMap.put(
String.format("%s_%s", segmentIdentifier, segmentQueryInterval),
new CachePopulator(cache, objectMapper, segmentCacheKey)
);
}
}
}
// Compile list of all segments not pulled from cache
for(Pair<ServerSelector, SegmentDescriptor> segment : segments) {
final DruidServer server = segment.lhs.pick();
List<SegmentDescriptor> descriptors = serverSegments.get(server);
if (descriptors == null) {
descriptors = Lists.newArrayList();
serverSegments.put(server, descriptors);
}
descriptors.add(segment.rhs);
}
return new LazySequence<T>(
new Supplier<Sequence<T>>()
{
@ -264,7 +276,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
@SuppressWarnings("unchecked")
private void addSequencesFromServer(ArrayList<Pair<DateTime, Sequence<T>>> listOfSequences)
{
for (Map.Entry<DruidServer, List<SegmentDescriptor>> entry : segs.entrySet()) {
for (Map.Entry<DruidServer, List<SegmentDescriptor>> entry : serverSegments.entrySet()) {
final DruidServer server = entry.getKey();
final List<SegmentDescriptor> descriptors = entry.getValue();
@ -328,13 +340,29 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
);
}
private Cache.NamedKey computeSegmentCacheKey(String segmentIdentifier, SegmentDescriptor descriptor, byte[] queryCacheKey)
{
final Interval segmentQueryInterval = descriptor.getInterval();
final byte[] versionBytes = descriptor.getVersion().getBytes();
return new Cache.NamedKey(
segmentIdentifier, ByteBuffer
.allocate(16 + versionBytes.length + 4 + queryCacheKey.length)
.putLong(segmentQueryInterval.getStartMillis())
.putLong(segmentQueryInterval.getEndMillis())
.put(versionBytes)
.putInt(descriptor.getPartitionNumber())
.put(queryCacheKey).array()
);
}
private static class CachePopulator
{
private final Cache cache;
private final ObjectMapper mapper;
private final byte[] key;
private final Cache.NamedKey key;
public CachePopulator(Cache cache, ObjectMapper mapper, byte[] key)
public CachePopulator(Cache cache, ObjectMapper mapper, Cache.NamedKey key)
{
this.cache = cache;
this.mapper = mapper;

View File

@ -19,13 +19,63 @@
package com.metamx.druid.client.cache;
import com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.Map;
/**
* An interface to limit the operations that can be done on a Cache so that it is easier to reason about what
* is actually going to be done.
*/
public interface Cache
{
public byte[] get(byte[] key);
public void put(byte[] key, byte[] value);
public void close();
public byte[] get(NamedKey key);
public void put(NamedKey key, byte[] value);
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys);
public void close(String namespace);
public CacheStats getStats();
public class NamedKey
{
final public String namespace;
final public byte[] key;
public NamedKey(String namespace, byte[] key) {
Preconditions.checkArgument(namespace != null, "namespace must not be null");
Preconditions.checkArgument(key != null, "key must not be null");
this.namespace = namespace;
this.key = key;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
NamedKey namedKey = (NamedKey) o;
if (!namespace.equals(namedKey.namespace)) {
return false;
}
if (!Arrays.equals(key, namedKey.key)) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = namespace.hashCode();
result = 31 * result + Arrays.hashCode(key);
return result;
}
}
}

View File

@ -1,28 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.client.cache;
/**
*/
public interface CacheBroker
{
public CacheStats getStats();
public Cache provideCache(String identifier);
}

View File

@ -27,21 +27,21 @@ import com.metamx.metrics.AbstractMonitor;
*/
public class CacheMonitor extends AbstractMonitor
{
private final CacheBroker cacheBroker;
private final Cache cache;
private volatile CacheStats prevCacheStats = null;
public CacheMonitor(
CacheBroker cacheBroker
Cache cache
)
{
this.cacheBroker = cacheBroker;
this.cache = cache;
}
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
final CacheStats currCacheStats = cacheBroker.getStats();
final CacheStats currCacheStats = cache.getStats();
final CacheStats deltaCacheStats = currCacheStats.delta(prevCacheStats);
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();

View File

@ -0,0 +1,158 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.client.cache;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
public class MapCache implements Cache
{
private final Map<ByteBuffer, byte[]> baseMap;
private final ByteCountingLRUMap byteCountingLRUMap;
private final Map<String, byte[]> namespaceId;
private final AtomicInteger ids;
private final Object clearLock = new Object();
private final AtomicLong hitCount = new AtomicLong(0);
private final AtomicLong missCount = new AtomicLong(0);
public static com.metamx.druid.client.cache.Cache create(final MapCacheConfig config)
{
return new MapCache(
new ByteCountingLRUMap(
config.getInitialSize(),
config.getLogEvictionCount(),
config.getSizeInBytes()
)
);
}
MapCache(
ByteCountingLRUMap byteCountingLRUMap
)
{
this.byteCountingLRUMap = byteCountingLRUMap;
this.baseMap = Collections.synchronizedMap(byteCountingLRUMap);
namespaceId = Maps.newHashMap();
ids = new AtomicInteger();
}
@Override
public CacheStats getStats()
{
return new CacheStats(
hitCount.get(),
missCount.get(),
byteCountingLRUMap.size(),
byteCountingLRUMap.getNumBytes(),
byteCountingLRUMap.getEvictionCount(),
0
);
}
@Override
public byte[] get(NamedKey key)
{
final byte[] retVal = baseMap.get(computeKey(getNamespaceId(key.namespace), key.key));
if (retVal == null) {
missCount.incrementAndGet();
} else {
hitCount.incrementAndGet();
}
return retVal;
}
@Override
public void put(NamedKey key, byte[] value)
{
synchronized (clearLock) {
baseMap.put(computeKey(getNamespaceId(key.namespace), key.key), value);
}
}
@Override
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
{
Map<NamedKey, byte[]> retVal = Maps.newHashMap();
for(NamedKey key : keys) {
retVal.put(key, get(key));
}
return retVal;
}
@Override
public void close(String namespace)
{
byte[] idBytes;
synchronized (namespaceId) {
idBytes = getNamespaceId(namespace);
if(idBytes == null) return;
namespaceId.remove(namespace);
}
synchronized (clearLock) {
Iterator<ByteBuffer> iter = baseMap.keySet().iterator();
while (iter.hasNext()) {
ByteBuffer next = iter.next();
if (next.get(0) == idBytes[0]
&& next.get(1) == idBytes[1]
&& next.get(2) == idBytes[2]
&& next.get(3) == idBytes[3]) {
iter.remove();
}
}
}
}
private byte[] getNamespaceId(final String identifier)
{
synchronized (namespaceId) {
byte[] idBytes = namespaceId.get(identifier);
if (idBytes != null) {
return idBytes;
}
idBytes = Ints.toByteArray(ids.getAndIncrement());
namespaceId.put(identifier, idBytes);
return idBytes;
}
}
private ByteBuffer computeKey(byte[] idBytes, byte[] key)
{
final ByteBuffer retVal = ByteBuffer.allocate(key.length + 4).put(idBytes).put(key);
retVal.rewind();
return retVal;
}
}

View File

@ -1,165 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.client.cache;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.metamx.common.ISE;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
public class MapCacheBroker implements CacheBroker
{
private final Map<ByteBuffer, byte[]> baseMap;
private final ByteCountingLRUMap byteCountingLRUMap;
private final Map<String, Cache> cacheCache;
private final AtomicInteger ids;
private final Object clearLock = new Object();
private final AtomicLong hitCount = new AtomicLong(0);
private final AtomicLong missCount = new AtomicLong(0);
public static CacheBroker create(final MapCacheBrokerConfig config)
{
return new MapCacheBroker(
new ByteCountingLRUMap(
config.getInitialSize(),
config.getLogEvictionCount(),
config.getSizeInBytes()
)
);
}
MapCacheBroker(
ByteCountingLRUMap byteCountingLRUMap
)
{
this.byteCountingLRUMap = byteCountingLRUMap;
this.baseMap = Collections.synchronizedMap(byteCountingLRUMap);
cacheCache = Maps.newHashMap();
ids = new AtomicInteger();
}
@Override
public CacheStats getStats()
{
return new CacheStats(
hitCount.get(),
missCount.get(),
byteCountingLRUMap.size(),
byteCountingLRUMap.getNumBytes(),
byteCountingLRUMap.getEvictionCount(),
0
);
}
@Override
public Cache provideCache(final String identifier)
{
synchronized (cacheCache) {
final Cache cachedCache = cacheCache.get(identifier);
if (cachedCache != null) {
return cachedCache;
}
final byte[] myIdBytes = Ints.toByteArray(ids.getAndIncrement());
final Cache theCache = new Cache()
{
volatile boolean open = true;
@Override
public byte[] get(byte[] key)
{
if (open) {
final byte[] retVal = baseMap.get(computeKey(key));
if (retVal == null) {
missCount.incrementAndGet();
} else {
hitCount.incrementAndGet();
}
return retVal;
}
throw new ISE("Cache for identifier[%s] is closed.", identifier);
}
@Override
public void put(byte[] key, byte[] value)
{
synchronized (clearLock) {
if (open) {
baseMap.put(computeKey(key), value);
return;
}
}
throw new ISE("Cache for identifier[%s] is closed.", identifier);
}
@Override
public void close()
{
synchronized (cacheCache) {
cacheCache.remove(identifier);
}
synchronized (clearLock) {
if (open) {
open = false;
Iterator<ByteBuffer> iter = baseMap.keySet().iterator();
while (iter.hasNext()) {
ByteBuffer next = iter.next();
if (next.get(0) == myIdBytes[0]
&& next.get(1) == myIdBytes[1]
&& next.get(2) == myIdBytes[2]
&& next.get(3) == myIdBytes[3]) {
iter.remove();
}
}
}
}
}
private ByteBuffer computeKey(byte[] key)
{
final ByteBuffer retVal = ByteBuffer.allocate(key.length + 4).put(myIdBytes).put(key);
retVal.rewind();
return retVal;
}
};
cacheCache.put(identifier, theCache);
return theCache;
}
}
}

View File

@ -24,7 +24,7 @@ import org.skife.config.Default;
/**
*/
public abstract class MapCacheBrokerConfig
public abstract class MapCacheConfig
{
@Config("${prefix}.sizeInBytes")
@Default("0")

View File

@ -19,7 +19,9 @@
package com.metamx.druid.client.cache;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import net.iharder.base64.Base64;
import net.spy.memcached.AddrUtil;
import net.spy.memcached.ConnectionFactoryBuilder;
@ -27,25 +29,28 @@ import net.spy.memcached.DefaultHashAlgorithm;
import net.spy.memcached.FailureMode;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.MemcachedClientIF;
import net.spy.memcached.internal.BulkFuture;
import net.spy.memcached.transcoders.SerializingTranscoder;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
public class MemcachedCacheBroker implements CacheBroker
public class MemcachedCache implements Cache
{
public static MemcachedCacheBroker create(final MemcachedCacheBrokerConfig config)
public static MemcachedCache create(final MemcachedCacheConfig config)
{
try {
SerializingTranscoder transcoder = new SerializingTranscoder(config.getMaxObjectSize());
// disable compression
transcoder.setCompressionThreshold(Integer.MAX_VALUE);
return new MemcachedCacheBroker(
return new MemcachedCache(
new MemcachedClient(
new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
.setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
@ -74,7 +79,7 @@ public class MemcachedCacheBroker implements CacheBroker
private final AtomicLong missCount = new AtomicLong(0);
private final AtomicLong timeoutCount = new AtomicLong(0);
MemcachedCacheBroker(MemcachedClientIF client, int timeout, int expiration) {
MemcachedCache(MemcachedClientIF client, int timeout, int expiration) {
this.timeout = timeout;
this.expiration = expiration;
this.client = client;
@ -94,52 +99,94 @@ public class MemcachedCacheBroker implements CacheBroker
}
@Override
public Cache provideCache(final String identifier)
public byte[] get(NamedKey key)
{
return new Cache()
{
@Override
public byte[] get(byte[] key)
{
Future<Object> future = client.asyncGet(computeKey(identifier, key));
try {
byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS);
if(bytes != null) {
hitCount.incrementAndGet();
}
else {
missCount.incrementAndGet();
}
return bytes;
}
catch(TimeoutException e) {
timeoutCount.incrementAndGet();
future.cancel(false);
return null;
}
catch(InterruptedException e) {
throw Throwables.propagate(e);
}
catch(ExecutionException e) {
throw Throwables.propagate(e);
}
Future<Object> future = client.asyncGet(computeKeyString(key));
try {
byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS);
if(bytes != null) {
hitCount.incrementAndGet();
}
@Override
public void put(byte[] key, byte[] value)
{
client.set(computeKey(identifier, key), expiration, value);
else {
missCount.incrementAndGet();
}
@Override
public void close()
{
// no resources to cleanup
}
};
return bytes;
}
catch(TimeoutException e) {
timeoutCount.incrementAndGet();
future.cancel(false);
return null;
}
catch(InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
catch(ExecutionException e) {
throw Throwables.propagate(e);
}
}
private String computeKey(String identifier, byte[] key) {
return identifier + Base64.encodeBytes(key, Base64.DONT_BREAK_LINES);
@Override
public void put(NamedKey key, byte[] value)
{
client.set(computeKeyString(key), expiration, value);
}
@Override
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
{
Map<String, NamedKey> keyLookup = Maps.uniqueIndex(
keys,
new Function<NamedKey, String>()
{
@Override
public String apply(
@Nullable NamedKey input
)
{
return computeKeyString(input);
}
}
);
BulkFuture<Map<String, Object>> future = client.asyncGetBulk(keyLookup.keySet());
try {
Map<String, Object> some = future.getSome(timeout, TimeUnit.MILLISECONDS);
if(future.isTimeout()) {
future.cancel(false);
timeoutCount.incrementAndGet();
}
missCount.addAndGet(keyLookup.size() - some.size());
hitCount.addAndGet(some.size());
Map<NamedKey, byte[]> results = Maps.newHashMap();
for(Map.Entry<String, Object> entry : some.entrySet()) {
results.put(
keyLookup.get(entry.getKey()),
(byte[])entry.getValue()
);
}
return results;
}
catch(InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
catch(ExecutionException e) {
throw Throwables.propagate(e);
}
}
@Override
public void close(String namespace)
{
// no resources to cleanup
}
private static String computeKeyString(NamedKey key) {
return key.namespace + ":" + Base64.encodeBytes(key.key, Base64.DONT_BREAK_LINES);
}
}

View File

@ -3,7 +3,7 @@ package com.metamx.druid.client.cache;
import org.skife.config.Config;
import org.skife.config.Default;
public abstract class MemcachedCacheBrokerConfig
public abstract class MemcachedCacheConfig
{
@Config("${prefix}.expiration")
@Default("31536000")

View File

@ -34,13 +34,13 @@ import com.metamx.druid.client.BrokerServerView;
import com.metamx.druid.client.CachingClusteredClient;
import com.metamx.druid.client.ClientConfig;
import com.metamx.druid.client.ClientInventoryManager;
import com.metamx.druid.client.cache.CacheBroker;
import com.metamx.druid.client.cache.Cache;
import com.metamx.druid.client.cache.CacheConfig;
import com.metamx.druid.client.cache.CacheMonitor;
import com.metamx.druid.client.cache.MapCacheBroker;
import com.metamx.druid.client.cache.MapCacheBrokerConfig;
import com.metamx.druid.client.cache.MemcachedCacheBroker;
import com.metamx.druid.client.cache.MemcachedCacheBrokerConfig;
import com.metamx.druid.client.cache.MapCache;
import com.metamx.druid.client.cache.MapCacheConfig;
import com.metamx.druid.client.cache.MemcachedCache;
import com.metamx.druid.client.cache.MemcachedCacheConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
@ -78,7 +78,7 @@ public class BrokerNode extends QueryableNode<BrokerNode>
private QueryToolChestWarehouse warehouse = null;
private HttpClient brokerHttpClient = null;
private CacheBroker cacheBroker = null;
private Cache cache = null;
private boolean useDiscovery = true;
@ -122,15 +122,15 @@ public class BrokerNode extends QueryableNode<BrokerNode>
return this;
}
public CacheBroker getCacheBroker()
public Cache getCache()
{
initializeCacheBroker();
return cacheBroker;
return cache;
}
public BrokerNode setCacheBroker(CacheBroker cacheBroker)
public BrokerNode setCache(Cache cache)
{
checkFieldNotSetAndSet("cacheBroker", cacheBroker);
checkFieldNotSetAndSet("cache", cache);
return this;
}
@ -185,7 +185,7 @@ public class BrokerNode extends QueryableNode<BrokerNode>
final Lifecycle lifecycle = getLifecycle();
final List<Monitor> monitors = getMonitors();
monitors.add(new CacheMonitor(cacheBroker));
monitors.add(new CacheMonitor(cache));
startMonitoring(monitors);
final BrokerServerView view = new BrokerServerView(warehouse, getSmileMapper(), brokerHttpClient);
@ -194,7 +194,7 @@ public class BrokerNode extends QueryableNode<BrokerNode>
);
lifecycle.addManagedInstance(clientInventoryManager);
final CachingClusteredClient baseClient = new CachingClusteredClient(warehouse, view, cacheBroker, getSmileMapper());
final CachingClusteredClient baseClient = new CachingClusteredClient(warehouse, view, cache, getSmileMapper());
lifecycle.addManagedInstance(baseClient);
@ -239,25 +239,25 @@ public class BrokerNode extends QueryableNode<BrokerNode>
private void initializeCacheBroker()
{
if (cacheBroker == null) {
if (cache == null) {
String cacheType = getConfigFactory()
.build(CacheConfig.class)
.getType();
if (cacheType.equals(CACHE_TYPE_LOCAL)) {
setCacheBroker(
MapCacheBroker.create(
setCache(
MapCache.create(
getConfigFactory().buildWithReplacements(
MapCacheBrokerConfig.class,
MapCacheConfig.class,
ImmutableMap.of("prefix", CACHE_PROPERTY_PREFIX)
)
)
);
} else if (cacheType.equals(CACHE_TYPE_MEMCACHED)) {
setCacheBroker(
MemcachedCacheBroker.create(
setCache(
MemcachedCache.create(
getConfigFactory().buildWithReplacements(
MemcachedCacheBrokerConfig.class,
MemcachedCacheConfig.class,
ImmutableMap.of("prefix", CACHE_PROPERTY_PREFIX)
)
)

View File

@ -31,56 +31,53 @@ public class MapCacheBrokerTest
private static final byte[] HI = "hi".getBytes();
private static final byte[] HO = "ho".getBytes();
private ByteCountingLRUMap baseMap;
private MapCacheBroker broker;
private MapCache cache;
@Before
public void setUp() throws Exception
{
baseMap = new ByteCountingLRUMap(1024 * 1024);
broker = new MapCacheBroker(baseMap);
cache = new MapCache(baseMap);
}
@Test
public void testSanity() throws Exception
{
Cache aCache = broker.provideCache("a");
Cache theCache = broker.provideCache("the");
Assert.assertNull(aCache.get(HI));
Assert.assertNull(cache.get(new Cache.NamedKey("a", HI)));
Assert.assertEquals(0, baseMap.size());
put(aCache, HI, 1);
put(cache, "a", HI, 1);
Assert.assertEquals(1, baseMap.size());
Assert.assertEquals(1, get(aCache, HI));
Assert.assertNull(theCache.get(HI));
Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get(new Cache.NamedKey("the", HI)));
put(theCache, HI, 2);
put(cache, "the", HI, 2);
Assert.assertEquals(2, baseMap.size());
Assert.assertEquals(1, get(aCache, HI));
Assert.assertEquals(2, get(theCache, HI));
Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertEquals(2, get(cache, "the", HI));
put(theCache, HO, 10);
put(cache, "the", HO, 10);
Assert.assertEquals(3, baseMap.size());
Assert.assertEquals(1, get(aCache, HI));
Assert.assertNull(aCache.get(HO));
Assert.assertEquals(2, get(theCache, HI));
Assert.assertEquals(10, get(theCache, HO));
Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get(new Cache.NamedKey("a", HO)));
Assert.assertEquals(2, get(cache, "the", HI));
Assert.assertEquals(10, get(cache, "the", HO));
theCache.close();
cache.close("the");
Assert.assertEquals(1, baseMap.size());
Assert.assertEquals(1, get(aCache, HI));
Assert.assertNull(aCache.get(HO));
Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get(new Cache.NamedKey("a", HO)));
aCache.close();
cache.close("a");
Assert.assertEquals(0, baseMap.size());
}
public void put(Cache cache, byte[] key, Integer value)
public void put(Cache cache, String namespace, byte[] key, Integer value)
{
cache.put(key, Ints.toByteArray(value));
cache.put(new Cache.NamedKey(namespace, key), Ints.toByteArray(value));
}
public int get(Cache cache, byte[] key)
public int get(Cache cache, String namespace, byte[] key)
{
return Ints.fromByteArray(cache.get(key));
return Ints.fromByteArray(cache.get(new Cache.NamedKey(namespace, key)));
}
}

View File

@ -3,6 +3,7 @@ package com.metamx.druid.client.cache;
import com.google.caliper.Param;
import com.google.caliper.Runner;
import com.google.caliper.SimpleBenchmark;
import com.google.common.collect.Lists;
import net.spy.memcached.AddrUtil;
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.DefaultHashAlgorithm;
@ -11,17 +12,19 @@ import net.spy.memcached.MemcachedClient;
import net.spy.memcached.MemcachedClientIF;
import net.spy.memcached.transcoders.SerializingTranscoder;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
{
private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_";
public static final String NAMESPACE = "default";
private MemcachedCacheBroker broker;
private MemcachedCache cache;
private MemcachedClientIF client;
private Cache cache;
private static byte[] randBytes;
@Param({"localhost:11211"}) String hosts;
@ -39,8 +42,6 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
// disable compression
transcoder.setCompressionThreshold(Integer.MAX_VALUE);
System.out.println(String.format("Using memcached hosts [%s]", hosts));
client = new MemcachedClient(
new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
.setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
@ -53,14 +54,12 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
AddrUtil.getAddresses(hosts)
);
broker = new MemcachedCacheBroker(
cache = new MemcachedCache(
client,
500, // 500 milliseconds
30000, // 30 seconds
3600 // 1 hour
);
cache = broker.provideCache("default");
randBytes = new byte[objectSize * 1024];
new Random(0).nextBytes(randBytes);
@ -69,15 +68,14 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
@Override
protected void tearDown() throws Exception
{
client.flush();
client.shutdown();
client.shutdown(1, TimeUnit.MINUTES);
}
public void timePutObjects(int reps) {
for(int i = 0; i < reps; ++i) {
for(int k = 0; k < objectCount; ++k) {
String key = BASE_KEY + i;
cache.put(key.getBytes(), randBytes);
String key = BASE_KEY + k;
cache.put(new Cache.NamedKey(NAMESPACE, key.getBytes()), randBytes);
}
// make sure the write queue is empty
client.waitForQueues(1, TimeUnit.HOURS);
@ -89,8 +87,25 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
long count = 0;
for (int i = 0; i < reps; i++) {
for(int k = 0; k < objectCount; ++k) {
String key = BASE_KEY + i;
bytes = cache.get(key.getBytes());
String key = BASE_KEY + k;
bytes = cache.get(new Cache.NamedKey(NAMESPACE, key.getBytes()));
count += bytes.length;
}
}
return count;
}
public long timeBulkGetObjects(int reps) {
long count = 0;
for (int i = 0; i < reps; i++) {
List<Cache.NamedKey> keys = Lists.newArrayList();
for(int k = 0; k < objectCount; ++k) {
String key = BASE_KEY + k;
keys.add(new Cache.NamedKey(NAMESPACE, key.getBytes()));
}
Map<Cache.NamedKey, byte[]> results = cache.getBulk(keys);
for(Cache.NamedKey key : keys) {
byte[] bytes = results.get(key);
count += bytes.length;
}
}

View File

@ -19,6 +19,8 @@
package com.metamx.druid.client.cache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import net.spy.memcached.CASResponse;
import net.spy.memcached.CASValue;
@ -27,6 +29,7 @@ import net.spy.memcached.ConnectionObserver;
import net.spy.memcached.MemcachedClientIF;
import net.spy.memcached.NodeLocator;
import net.spy.memcached.internal.BulkFuture;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.transcoders.SerializingTranscoder;
import net.spy.memcached.transcoders.Transcoder;
import org.junit.Assert;
@ -51,51 +54,70 @@ public class MemcachedCacheBrokerTest
{
private static final byte[] HI = "hi".getBytes();
private static final byte[] HO = "ho".getBytes();
private MemcachedCacheBroker broker;
private MemcachedCache cache;
@Before
public void setUp() throws Exception
{
MemcachedClientIF client = new MockMemcachedClient();
broker = new MemcachedCacheBroker(client, 500, 3600);
cache = new MemcachedCache(client, 500, 3600);
}
@Test
public void testSanity() throws Exception
{
Cache aCache = broker.provideCache("a");
Cache theCache = broker.provideCache("the");
Assert.assertNull(cache.get(new Cache.NamedKey("a", HI)));
put(cache, "a", HI, 1);
Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get(new Cache.NamedKey("the", HI)));
Assert.assertNull(aCache.get(HI));
put(aCache, HI, 1);
Assert.assertEquals(1, get(aCache, HI));
Assert.assertNull(theCache.get(HI));
put(cache, "the", HI, 2);
Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertEquals(2, get(cache, "the", HI));
put(theCache, HI, 2);
Assert.assertEquals(1, get(aCache, HI));
Assert.assertEquals(2, get(theCache, HI));
put(cache, "the", HO, 10);
Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get(new Cache.NamedKey("a", HO)));
Assert.assertEquals(2, get(cache, "the", HI));
Assert.assertEquals(10, get(cache, "the", HO));
put(theCache, HO, 10);
Assert.assertEquals(1, get(aCache, HI));
Assert.assertNull(aCache.get(HO));
Assert.assertEquals(2, get(theCache, HI));
Assert.assertEquals(10, get(theCache, HO));
cache.close("the");
Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get(new Cache.NamedKey("a", HO)));
theCache.close();
Assert.assertEquals(1, get(aCache, HI));
Assert.assertNull(aCache.get(HO));
aCache.close();
cache.close("a");
}
public void put(Cache cache, byte[] key, Integer value)
@Test
public void testGetBulk() throws Exception
{
cache.put(key, Ints.toByteArray(value));
Assert.assertNull(cache.get(new Cache.NamedKey("the", HI)));
put(cache, "the", HI, 2);
put(cache, "the", HO, 10);
Cache.NamedKey key1 = new Cache.NamedKey("the", HI);
Cache.NamedKey key2 = new Cache.NamedKey("the", HO);
Map<Cache.NamedKey, byte[]> result = cache.getBulk(
Lists.newArrayList(
key1,
key2
)
);
Assert.assertEquals(2, Ints.fromByteArray(result.get(key1)));
Assert.assertEquals(10, Ints.fromByteArray(result.get(key2)));
}
public int get(Cache cache, byte[] key)
public void put(Cache cache, String namespace, byte[] key, Integer value)
{
return Ints.fromByteArray(cache.get(key));
cache.put(new Cache.NamedKey(namespace, key), Ints.toByteArray(value));
}
public int get(Cache cache, String namespace, byte[] key)
{
return Ints.fromByteArray(cache.get(new Cache.NamedKey(namespace, key)));
}
}
@ -365,9 +387,67 @@ class MockMemcachedClient implements MemcachedClientIF
}
@Override
public <T> BulkFuture<Map<String, T>> asyncGetBulk(Iterator<String> keys, Transcoder<T> tc)
public <T> BulkFuture<Map<String, T>> asyncGetBulk(final Iterator<String> keys, final Transcoder<T> tc)
{
throw new UnsupportedOperationException("not implemented");
return new BulkFuture<Map<String, T>>()
{
@Override
public boolean isTimeout()
{
return false;
}
@Override
public Map<String, T> getSome(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException
{
return get();
}
@Override
public OperationStatus getStatus()
{
return null;
}
@Override
public boolean cancel(boolean b)
{
return false;
}
@Override
public boolean isCancelled()
{
return false;
}
@Override
public boolean isDone()
{
return true;
}
@Override
public Map<String, T> get() throws InterruptedException, ExecutionException
{
Map<String, T> retVal = Maps.newHashMap();
while(keys.hasNext()) {
String key = keys.next();
CachedData data = theMap.get(key);
retVal.put(key, data != null ? tc.decode(data) : null);
}
return retVal;
}
@Override
public Map<String, T> get(long l, TimeUnit timeUnit)
throws InterruptedException, ExecutionException, TimeoutException
{
return get();
}
};
}
@Override
@ -383,9 +463,9 @@ class MockMemcachedClient implements MemcachedClientIF
}
@Override
public BulkFuture<Map<String, Object>> asyncGetBulk(Collection<String> keys)
public BulkFuture<Map<String, Object>> asyncGetBulk(final Collection<String> keys)
{
throw new UnsupportedOperationException("not implemented");
return asyncGetBulk(keys.iterator(), transcoder);
}
@Override

View File

@ -19,6 +19,7 @@
package com.metamx.druid.indexer.data;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.exception.FormattedException;
@ -56,7 +57,18 @@ public class StringInputRowParser
this.dimensionExclusions = Sets.newHashSet();
if (dimensionExclusions != null) {
this.dimensionExclusions.addAll(dimensionExclusions);
this.dimensionExclusions.addAll(
Lists.transform(
dimensionExclusions, new Function<String, String>()
{
@Override
public String apply(String s)
{
return s.toLowerCase();
}
}
)
);
}
this.dimensionExclusions.add(timestampSpec.getTimestampColumn());

View File

@ -23,21 +23,23 @@ import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.PeekingIterator;
import com.google.common.io.Closeables;
import com.metamx.common.IAE;
import com.metamx.common.Pair;
import com.metamx.common.ISE;
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.Parser;
import com.metamx.common.parsers.ParserUtils;
import com.metamx.druid.CombiningIterable;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.druid.shard.ShardSpec;
import com.metamx.druid.shard.SingleDimensionShardSpec;
@ -45,7 +47,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InvalidJobConfException;
@ -56,8 +58,11 @@ import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.codehaus.jackson.type.TypeReference;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
@ -65,20 +70,26 @@ import org.joda.time.Interval;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Determines appropriate ShardSpecs for a job by determining whether or not partitioning is necessary, and if so,
* choosing the highest cardinality dimension that satisfies the criteria:
*
* <ul>
* <li>Must have exactly one value per row.</li>
* <li>Must not generate oversized partitions. A dimension with N rows having the same value will necessarily
* put all those rows in the same partition, and that partition may be much larger than the target size.</li>
* </ul>
*/
public class DeterminePartitionsJob implements Jobby
{
private static final Logger log = new Logger(DeterminePartitionsJob.class);
private static final Joiner keyJoiner = Joiner.on(",");
private static final Splitter keySplitter = Splitter.on(",");
private static final Joiner tabJoiner = HadoopDruidIndexerConfig.tabJoiner;
private static final Splitter tabSplitter = HadoopDruidIndexerConfig.tabSplitter;
@ -91,146 +102,314 @@ public class DeterminePartitionsJob implements Jobby
this.config = config;
}
public boolean run()
public static void injectSystemProperties(Job job)
{
try {
Job job = new Job(
new Configuration(),
String.format("%s-determine_partitions-%s", config.getDataSource(), config.getIntervals())
);
job.getConfiguration().set("io.sort.record.percent", "0.19");
for (String propName : System.getProperties().stringPropertyNames()) {
Configuration conf = job.getConfiguration();
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
final Configuration conf = job.getConfiguration();
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(DeterminePartitionsMapper.class);
job.setMapOutputValueClass(Text.class);
SortableBytes.useSortableBytesAsKey(job);
job.setCombinerClass(DeterminePartitionsCombiner.class);
job.setReducerClass(DeterminePartitionsReducer.class);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(DeterminePartitionsJob.DeterminePartitionsOutputFormat.class);
FileOutputFormat.setOutputPath(job, config.makeIntermediatePath());
config.addInputPaths(job);
config.intoConfiguration(job);
job.setJarByClass(DeterminePartitionsJob.class);
job.submit();
log.info("Job submitted, status available at %s", job.getTrackingURL());
final boolean retVal = job.waitForCompletion(true);
if (retVal) {
log.info("Job completed, loading up partitions for intervals[%s].", config.getSegmentGranularIntervals());
FileSystem fileSystem = null;
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals()) {
DateTime bucket = segmentGranularity.getStart();
final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0));
if (fileSystem == null) {
fileSystem = partitionInfoPath.getFileSystem(job.getConfiguration());
}
if (fileSystem.exists(partitionInfoPath)) {
List<ShardSpec> specs = config.jsonMapper.readValue(
Utils.openInputStream(job, partitionInfoPath), new TypeReference<List<ShardSpec>>()
{
}
);
List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(specs.size());
for (int i = 0; i < specs.size(); ++i) {
actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++));
log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
}
shardSpecs.put(bucket, actualSpecs);
}
else {
log.info("Path[%s] didn't exist!?", partitionInfoPath);
}
}
config.setShardSpecs(shardSpecs);
}
else {
log.info("Job completed unsuccessfully.");
}
return retVal;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
public static class DeterminePartitionsMapper extends Mapper<LongWritable, Text, BytesWritable, Text>
public boolean run()
{
private HadoopDruidIndexerConfig config;
private String partitionDimension;
private Parser parser;
private Function<String, DateTime> timestampConverter;
try {
/*
* Group by (timestamp, dimensions) so we can correctly count dimension values as they would appear
* in the final segment.
*/
if(!config.getPartitionsSpec().isAssumeGrouped()) {
final Job groupByJob = new Job(
new Configuration(),
String.format("%s-determine_partitions_groupby-%s", config.getDataSource(), config.getIntervals())
);
injectSystemProperties(groupByJob);
groupByJob.setInputFormatClass(TextInputFormat.class);
groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class);
groupByJob.setMapOutputKeyClass(BytesWritable.class);
groupByJob.setMapOutputValueClass(NullWritable.class);
groupByJob.setCombinerClass(DeterminePartitionsGroupByReducer.class);
groupByJob.setReducerClass(DeterminePartitionsGroupByReducer.class);
groupByJob.setOutputKeyClass(BytesWritable.class);
groupByJob.setOutputValueClass(NullWritable.class);
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
groupByJob.setJarByClass(DeterminePartitionsJob.class);
config.addInputPaths(groupByJob);
config.intoConfiguration(groupByJob);
FileOutputFormat.setOutputPath(groupByJob, config.makeGroupedDataDir());
groupByJob.submit();
log.info("Job %s submitted, status available at: %s", groupByJob.getJobName(), groupByJob.getTrackingURL());
if(!groupByJob.waitForCompletion(true)) {
log.error("Job failed: %s", groupByJob.getJobID().toString());
return false;
}
} else {
log.info("Skipping group-by job.");
}
/*
* Read grouped data and determine appropriate partitions.
*/
final Job dimSelectionJob = new Job(
new Configuration(),
String.format("%s-determine_partitions_dimselection-%s", config.getDataSource(), config.getIntervals())
);
dimSelectionJob.getConfiguration().set("io.sort.record.percent", "0.19");
injectSystemProperties(dimSelectionJob);
if(!config.getPartitionsSpec().isAssumeGrouped()) {
// Read grouped data from the groupByJob.
dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionPostGroupByMapper.class);
dimSelectionJob.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.addInputPath(dimSelectionJob, config.makeGroupedDataDir());
} else {
// Directly read the source data, since we assume it's already grouped.
dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionAssumeGroupedMapper.class);
dimSelectionJob.setInputFormatClass(TextInputFormat.class);
config.addInputPaths(dimSelectionJob);
}
SortableBytes.useSortableBytesAsMapOutputKey(dimSelectionJob);
dimSelectionJob.setMapOutputValueClass(Text.class);
dimSelectionJob.setCombinerClass(DeterminePartitionsDimSelectionCombiner.class);
dimSelectionJob.setReducerClass(DeterminePartitionsDimSelectionReducer.class);
dimSelectionJob.setOutputKeyClass(BytesWritable.class);
dimSelectionJob.setOutputValueClass(Text.class);
dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class);
dimSelectionJob.setJarByClass(DeterminePartitionsJob.class);
config.intoConfiguration(dimSelectionJob);
FileOutputFormat.setOutputPath(dimSelectionJob, config.makeIntermediatePath());
dimSelectionJob.submit();
log.info(
"Job %s submitted, status available at: %s",
dimSelectionJob.getJobName(),
dimSelectionJob.getTrackingURL()
);
if(!dimSelectionJob.waitForCompletion(true)) {
log.error("Job failed: %s", dimSelectionJob.getJobID().toString());
return false;
}
/*
* Load partitions determined by the previous job.
*/
log.info("Job completed, loading up partitions for intervals[%s].", config.getSegmentGranularIntervals());
FileSystem fileSystem = null;
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals()) {
DateTime bucket = segmentGranularity.getStart();
final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0));
if (fileSystem == null) {
fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration());
}
if (fileSystem.exists(partitionInfoPath)) {
List<ShardSpec> specs = config.jsonMapper.readValue(
Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference<List<ShardSpec>>()
{
}
);
List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(specs.size());
for (int i = 0; i < specs.size(); ++i) {
actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++));
log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
}
shardSpecs.put(bucket, actualSpecs);
}
else {
log.info("Path[%s] didn't exist!?", partitionInfoPath);
}
}
config.setShardSpecs(shardSpecs);
return true;
} catch(Exception e) {
throw Throwables.propagate(e);
}
}
public static class DeterminePartitionsGroupByMapper extends HadoopDruidIndexerMapper<BytesWritable, NullWritable>
{
private QueryGranularity rollupGranularity = null;
@Override
protected void setup(Context context)
throws IOException, InterruptedException
{
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
partitionDimension = config.getPartitionDimension();
parser = config.getDataSpec().getParser();
timestampConverter = ParserUtils.createTimestampParser(config.getTimestampFormat());
super.setup(context);
rollupGranularity = getConfig().getRollupSpec().getRollupGranularity();
}
@Override
protected void innerMap(
InputRow inputRow,
Text text,
Context context
) throws IOException, InterruptedException
{
// Create group key
// TODO -- There are more efficient ways to do this
final Map<String, Set<String>> dims = Maps.newTreeMap();
for(final String dim : inputRow.getDimensions()) {
final Set<String> dimValues = ImmutableSortedSet.copyOf(inputRow.getDimension(dim));
if(dimValues.size() > 0) {
dims.put(dim, dimValues);
}
}
final List<Object> groupKey = ImmutableList.of(
rollupGranularity.truncate(inputRow.getTimestampFromEpoch()),
dims
);
context.write(
new BytesWritable(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey)),
NullWritable.get()
);
}
}
public static class DeterminePartitionsGroupByReducer
extends Reducer<BytesWritable, NullWritable, BytesWritable, NullWritable>
{
@Override
protected void reduce(
BytesWritable key,
Iterable<NullWritable> values,
Context context
) throws IOException, InterruptedException
{
context.write(key, NullWritable.get());
}
}
/**
* This DimSelection mapper runs on data generated by our GroupBy job.
*/
public static class DeterminePartitionsDimSelectionPostGroupByMapper
extends Mapper<BytesWritable, NullWritable, BytesWritable, Text>
{
private DeterminePartitionsDimSelectionMapperHelper helper;
@Override
protected void setup(Context context)
throws IOException, InterruptedException
{
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
final String partitionDimension = config.getPartitionDimension();
helper = new DeterminePartitionsDimSelectionMapperHelper(config, partitionDimension);
}
@Override
protected void map(
LongWritable key, Text value, Context context
BytesWritable key, NullWritable value, Context context
) throws IOException, InterruptedException
{
Map<String, Object> values = parser.parse(value.toString());
final DateTime timestamp;
final String tsStr = (String) values.get(config.getTimestampColumnName());
try {
timestamp = timestampConverter.apply(tsStr);
}
catch(IllegalArgumentException e) {
if(config.isIgnoreInvalidRows()) {
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
return; // we're ignoring this invalid row
}
else {
throw e;
}
}
final List<Object> timeAndDims = HadoopDruidIndexerConfig.jsonMapper.readValue(key.getBytes(), List.class);
final DateTime timestamp = new DateTime(timeAndDims.get(0));
final Map<String, Iterable<String>> dims = (Map<String, Iterable<String>>) timeAndDims.get(1);
helper.emitDimValueCounts(context, timestamp, dims);
}
}
/**
* This DimSelection mapper runs on raw input data that we assume has already been grouped.
*/
public static class DeterminePartitionsDimSelectionAssumeGroupedMapper
extends HadoopDruidIndexerMapper<BytesWritable, Text>
{
private DeterminePartitionsDimSelectionMapperHelper helper;
@Override
protected void setup(Context context)
throws IOException, InterruptedException
{
super.setup(context);
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
final String partitionDimension = config.getPartitionDimension();
helper = new DeterminePartitionsDimSelectionMapperHelper(config, partitionDimension);
}
@Override
protected void innerMap(
InputRow inputRow,
Text text,
Context context
) throws IOException, InterruptedException
{
final Map<String, Iterable<String>> dims = Maps.newHashMap();
for(final String dim : inputRow.getDimensions()) {
dims.put(dim, inputRow.getDimension(dim));
}
helper.emitDimValueCounts(context, new DateTime(inputRow.getTimestampFromEpoch()), dims);
}
}
/**
* Since we have two slightly different DimSelectionMappers, this class encapsulates the shared logic for
* emitting dimension value counts.
*/
public static class DeterminePartitionsDimSelectionMapperHelper
{
private final HadoopDruidIndexerConfig config;
private final String partitionDimension;
public DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config, String partitionDimension)
{
this.config = config;
this.partitionDimension = partitionDimension;
}
public void emitDimValueCounts(
TaskInputOutputContext<? extends Writable, ? extends Writable, BytesWritable, Text> context,
DateTime timestamp,
Map<String, Iterable<String>> dims
) throws IOException, InterruptedException
{
final Optional<Interval> maybeInterval = config.getGranularitySpec().bucketInterval(timestamp);
if(maybeInterval.isPresent()) {
final DateTime bucket = maybeInterval.get().getStart();
final String outKey = keyJoiner.join(bucket.toString(), partitionDimension);
final Object dimValue = values.get(partitionDimension);
if (! (dimValue instanceof String)) {
throw new IAE("Cannot partition on a tag-style dimension[%s], line was[%s]", partitionDimension, value);
if(!maybeInterval.isPresent()) {
throw new ISE("WTF?! No bucket found for timestamp: %s", timestamp);
}
final Interval interval = maybeInterval.get();
final byte[] groupKey = interval.getStart().toString().getBytes(Charsets.UTF_8);
for(final Map.Entry<String, Iterable<String>> dimAndValues : dims.entrySet()) {
final String dim = dimAndValues.getKey();
if(partitionDimension == null || partitionDimension.equals(dim)) {
final Iterable<String> dimValues = dimAndValues.getValue();
if(Iterables.size(dimValues) == 1) {
// Emit this value.
write(context, groupKey, new DimValueCount(dim, Iterables.getOnlyElement(dimValues), 1));
} else {
// This dimension is unsuitable for partitioning. Poison it by emitting a negative value.
write(context, groupKey, new DimValueCount(dim, "", -1));
}
}
final byte[] groupKey = outKey.getBytes(Charsets.UTF_8);
write(context, groupKey, "", 1);
write(context, groupKey, (String) dimValue, 1);
}
}
}
private static abstract class DeterminePartitionsBaseReducer extends Reducer<BytesWritable, Text, BytesWritable, Text>
private static abstract class DeterminePartitionsDimSelectionBaseReducer
extends Reducer<BytesWritable, Text, BytesWritable, Text>
{
protected static volatile HadoopDruidIndexerConfig config = null;
@ -240,7 +419,7 @@ public class DeterminePartitionsJob implements Jobby
throws IOException, InterruptedException
{
if (config == null) {
synchronized (DeterminePartitionsBaseReducer.class) {
synchronized (DeterminePartitionsDimSelectionBaseReducer.class) {
if (config == null) {
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
}
@ -255,166 +434,275 @@ public class DeterminePartitionsJob implements Jobby
{
SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
final Iterable<Pair<String, Long>> combinedIterable = combineRows(values);
final Iterable<DimValueCount> combinedIterable = combineRows(values);
innerReduce(context, keyBytes, combinedIterable);
}
protected abstract void innerReduce(
Context context, SortableBytes keyBytes, Iterable<Pair<String, Long>> combinedIterable
Context context, SortableBytes keyBytes, Iterable<DimValueCount> combinedIterable
) throws IOException, InterruptedException;
private Iterable<Pair<String, Long>> combineRows(Iterable<Text> input)
private Iterable<DimValueCount> combineRows(Iterable<Text> input)
{
return new CombiningIterable<Pair<String, Long>>(
return new CombiningIterable<DimValueCount>(
Iterables.transform(
input,
new Function<Text, Pair<String, Long>>()
new Function<Text, DimValueCount>()
{
@Override
public Pair<String, Long> apply(Text input)
public DimValueCount apply(Text input)
{
Iterator<String> splits = tabSplitter.split(input.toString()).iterator();
return new Pair<String, Long>(splits.next(), Long.parseLong(splits.next()));
return DimValueCount.fromText(input);
}
}
),
new Comparator<Pair<String, Long>>()
new Comparator<DimValueCount>()
{
@Override
public int compare(Pair<String, Long> o1, Pair<String, Long> o2)
public int compare(DimValueCount o1, DimValueCount o2)
{
return o1.lhs.compareTo(o2.lhs);
return ComparisonChain.start().compare(o1.dim, o2.dim).compare(o1.value, o2.value).result();
}
},
new BinaryFn<Pair<String, Long>, Pair<String, Long>, Pair<String, Long>>()
new BinaryFn<DimValueCount, DimValueCount, DimValueCount>()
{
@Override
public Pair<String, Long> apply(Pair<String, Long> arg1, Pair<String, Long> arg2)
public DimValueCount apply(DimValueCount arg1, DimValueCount arg2)
{
if (arg2 == null) {
return arg1;
}
return new Pair<String, Long>(arg1.lhs, arg1.rhs + arg2.rhs);
// Respect "poisoning" (negative values mean we can't use this dimension)
final int newNumRows = (arg1.numRows >= 0 && arg2.numRows >= 0 ? arg1.numRows + arg2.numRows : -1);
return new DimValueCount(arg1.dim, arg1.value, newNumRows);
}
}
);
}
}
public static class DeterminePartitionsCombiner extends DeterminePartitionsBaseReducer
public static class DeterminePartitionsDimSelectionCombiner extends DeterminePartitionsDimSelectionBaseReducer
{
@Override
protected void innerReduce(
Context context, SortableBytes keyBytes, Iterable<Pair<String, Long>> combinedIterable
Context context, SortableBytes keyBytes, Iterable<DimValueCount> combinedIterable
) throws IOException, InterruptedException
{
for (Pair<String, Long> pair : combinedIterable) {
write(context, keyBytes.getGroupKey(), pair.lhs, pair.rhs);
for (DimValueCount dvc : combinedIterable) {
write(context, keyBytes.getGroupKey(), dvc);
}
}
}
public static class DeterminePartitionsReducer extends DeterminePartitionsBaseReducer
public static class DeterminePartitionsDimSelectionReducer extends DeterminePartitionsDimSelectionBaseReducer
{
String previousBoundary;
long runningTotal;
private static final double SHARD_COMBINE_THRESHOLD = 0.25;
private static final double SHARD_OVERSIZE_THRESHOLD = 1.5;
@Override
protected void innerReduce(
Context context, SortableBytes keyBytes, Iterable<Pair<String, Long>> combinedIterable
Context context, SortableBytes keyBytes, Iterable<DimValueCount> combinedIterable
) throws IOException, InterruptedException
{
PeekingIterator<Pair<String, Long>> iterator = Iterators.peekingIterator(combinedIterable.iterator());
Pair<String, Long> totalPair = iterator.next();
PeekingIterator<DimValueCount> iterator = Iterators.peekingIterator(combinedIterable.iterator());
Preconditions.checkState(totalPair.lhs.equals(""), "Total pair value was[%s]!?", totalPair.lhs);
long totalRows = totalPair.rhs;
// "iterator" will take us over many candidate dimensions
DimPartitions currentDimPartitions = null;
DimPartition currentDimPartition = null;
String currentDimPartitionStart = null;
boolean currentDimSkip = false;
long numPartitions = Math.max(totalRows / config.getTargetPartitionSize(), 1);
long expectedRowsPerPartition = totalRows / numPartitions;
// We'll store possible partitions in here
final Map<String, DimPartitions> dimPartitionss = Maps.newHashMap();
class PartitionsList extends ArrayList<ShardSpec>
{
}
List<ShardSpec> partitions = new PartitionsList();
while(iterator.hasNext()) {
final DimValueCount dvc = iterator.next();
runningTotal = 0;
Pair<String, Long> prev = null;
previousBoundary = null;
while (iterator.hasNext()) {
Pair<String, Long> curr = iterator.next();
if (runningTotal > expectedRowsPerPartition) {
Preconditions.checkNotNull(
prev, "Prev[null] while runningTotal[%s] was > expectedRows[%s]!?", runningTotal, expectedRowsPerPartition
);
addPartition(partitions, curr.lhs);
if(currentDimPartitions == null || !currentDimPartitions.dim.equals(dvc.dim)) {
// Starting a new dimension! Exciting!
currentDimPartitions = new DimPartitions(dvc.dim);
currentDimPartition = new DimPartition();
currentDimPartitionStart = null;
currentDimSkip = false;
}
runningTotal += curr.rhs;
prev = curr;
// Respect poisoning
if(!currentDimSkip && dvc.numRows < 0) {
log.info("Cannot partition on multi-valued dimension: %s", dvc.dim);
currentDimSkip = true;
}
if(currentDimSkip) {
continue;
}
// See if we need to cut a new partition ending immediately before this dimension value
if(currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows >= config.getTargetPartitionSize()) {
final ShardSpec shardSpec = new SingleDimensionShardSpec(
currentDimPartitions.dim,
currentDimPartitionStart,
dvc.value,
currentDimPartitions.partitions.size()
);
log.info(
"Adding possible shard with %,d rows and %,d unique values: %s",
currentDimPartition.rows,
currentDimPartition.cardinality,
shardSpec
);
currentDimPartition.shardSpec = shardSpec;
currentDimPartitions.partitions.add(currentDimPartition);
currentDimPartition = new DimPartition();
currentDimPartitionStart = dvc.value;
}
// Update counters
currentDimPartition.cardinality ++;
currentDimPartition.rows += dvc.numRows;
if(!iterator.hasNext() || !currentDimPartitions.dim.equals(iterator.peek().dim)) {
// Finalize the current dimension
if(currentDimPartition.rows > 0) {
// One more shard to go
final ShardSpec shardSpec;
if (currentDimPartitions.partitions.isEmpty()) {
shardSpec = new NoneShardSpec();
} else {
if(currentDimPartition.rows < config.getTargetPartitionSize() * SHARD_COMBINE_THRESHOLD) {
// Combine with previous shard
final DimPartition previousDimPartition = currentDimPartitions.partitions.remove(
currentDimPartitions.partitions.size() - 1
);
final SingleDimensionShardSpec previousShardSpec = (SingleDimensionShardSpec) previousDimPartition.shardSpec;
shardSpec = new SingleDimensionShardSpec(
currentDimPartitions.dim,
previousShardSpec.getStart(),
null,
previousShardSpec.getPartitionNum()
);
log.info("Removing possible shard: %s", previousShardSpec);
currentDimPartition.rows += previousDimPartition.rows;
currentDimPartition.cardinality += previousDimPartition.cardinality;
} else {
// Create new shard
shardSpec = new SingleDimensionShardSpec(
currentDimPartitions.dim,
currentDimPartitionStart,
null,
currentDimPartitions.partitions.size()
);
}
}
log.info(
"Adding possible shard with %,d rows and %,d unique values: %s",
currentDimPartition.rows,
currentDimPartition.cardinality,
shardSpec
);
currentDimPartition.shardSpec = shardSpec;
currentDimPartitions.partitions.add(currentDimPartition);
}
log.info(
"Completed dimension[%s]: %,d possible shards with %,d unique values",
currentDimPartitions.dim,
currentDimPartitions.partitions.size(),
currentDimPartitions.getCardinality()
);
// Add ourselves to the partitions map
dimPartitionss.put(currentDimPartitions.dim, currentDimPartitions);
}
}
if (partitions.isEmpty()) {
partitions.add(new NoneShardSpec());
} else if (((double) runningTotal / (double) expectedRowsPerPartition) < 0.25) {
final SingleDimensionShardSpec lastSpec = (SingleDimensionShardSpec) partitions.remove(partitions.size() - 1);
partitions.add(
new SingleDimensionShardSpec(
config.getPartitionDimension(),
lastSpec.getStart(),
null,
lastSpec.getPartitionNum()
)
);
} else {
partitions.add(
new SingleDimensionShardSpec(
config.getPartitionDimension(),
previousBoundary,
null,
partitions.size()
)
);
// Choose best dimension
if(dimPartitionss.isEmpty()) {
throw new ISE("No suitable partitioning dimension found!");
}
DateTime bucket = new DateTime(
Iterables.get(keySplitter.split(new String(keyBytes.getGroupKey(), Charsets.UTF_8)), 0)
);
OutputStream out = Utils.makePathAndOutputStream(
final int totalRows = dimPartitionss.values().iterator().next().getRows();
int maxCardinality = -1;
DimPartitions maxCardinalityPartitions = null;
for(final DimPartitions dimPartitions : dimPartitionss.values()) {
if(dimPartitions.getRows() != totalRows) {
throw new ISE(
"WTF?! Dimension[%s] row count %,d != expected row count %,d",
dimPartitions.dim,
dimPartitions.getRows(),
totalRows
);
}
// Make sure none of these shards are oversized
boolean oversized = false;
for(final DimPartition partition : dimPartitions.partitions) {
if(partition.rows > config.getTargetPartitionSize() * SHARD_OVERSIZE_THRESHOLD) {
log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec);
oversized = true;
}
}
if(oversized) {
continue;
}
if(dimPartitions.getCardinality() > maxCardinality) {
maxCardinality = dimPartitions.getCardinality();
maxCardinalityPartitions = dimPartitions;
}
}
if(maxCardinalityPartitions == null) {
throw new ISE("No suitable partitioning dimension found!");
}
final DateTime bucket = new DateTime(new String(keyBytes.getGroupKey(), Charsets.UTF_8));
final OutputStream out = Utils.makePathAndOutputStream(
context, config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0)), config.isOverwriteFiles()
);
for (ShardSpec partition : partitions) {
log.info("%s", partition);
final List<ShardSpec> chosenShardSpecs = Lists.transform(
maxCardinalityPartitions.partitions, new Function<DimPartition, ShardSpec>()
{
@Override
public ShardSpec apply(DimPartition dimPartition)
{
return dimPartition.shardSpec;
}
}
);
log.info("Chosen partitions:");
for (ShardSpec shardSpec : chosenShardSpecs) {
log.info(" %s", shardSpec);
}
try {
config.jsonMapper.writeValue(out, partitions);
HadoopDruidIndexerConfig.jsonMapper.writerWithType(new TypeReference<List<ShardSpec>>() {}).writeValue(
out,
chosenShardSpecs
);
}
finally {
Closeables.close(out, false);
}
}
private void addPartition(List<ShardSpec> partitions, String boundary)
{
partitions.add(
new SingleDimensionShardSpec(
config.getPartitionDimension(),
previousBoundary,
boundary,
partitions.size()
)
);
previousBoundary = boundary;
runningTotal = 0;
}
}
public static class DeterminePartitionsOutputFormat extends FileOutputFormat
public static class DeterminePartitionsDimSelectionOutputFormat extends FileOutputFormat
{
@Override
public RecordWriter getRecordWriter(final TaskAttemptContext job) throws IOException, InterruptedException
@ -444,17 +732,81 @@ public class DeterminePartitionsJob implements Jobby
}
}
private static class DimPartitions
{
public final String dim;
public final List<DimPartition> partitions = Lists.newArrayList();
private DimPartitions(String dim)
{
this.dim = dim;
}
public int getCardinality()
{
int sum = 0;
for(final DimPartition dimPartition : partitions) {
sum += dimPartition.cardinality;
}
return sum;
}
public int getRows()
{
int sum = 0;
for(final DimPartition dimPartition : partitions) {
sum += dimPartition.rows;
}
return sum;
}
}
private static class DimPartition
{
public ShardSpec shardSpec = null;
public int cardinality = 0;
public int rows = 0;
}
private static class DimValueCount
{
public final String dim;
public final String value;
public final int numRows;
private DimValueCount(String dim, String value, int numRows)
{
this.dim = dim;
this.value = value;
this.numRows = numRows;
}
public Text toText()
{
return new Text(tabJoiner.join(dim, String.valueOf(numRows), value));
}
public static DimValueCount fromText(Text text)
{
final Iterator<String> splits = tabSplitter.limit(3).split(text.toString()).iterator();
final String dim = splits.next();
final int numRows = Integer.parseInt(splits.next());
final String value = splits.next();
return new DimValueCount(dim, value, numRows);
}
}
private static void write(
TaskInputOutputContext<? extends Writable, ? extends Writable, BytesWritable, Text> context,
final byte[] groupKey,
String value,
long numRows
DimValueCount dimValueCount
)
throws IOException, InterruptedException
{
context.write(
new SortableBytes(groupKey, value.getBytes(HadoopDruidIndexerConfig.javaNativeCharset)).toBytesWritable(),
new Text(tabJoiner.join(value, numRows))
new SortableBytes(groupKey, tabJoiner.join(dimValueCount.dim, dimValueCount.value).getBytes(HadoopDruidIndexerConfig.javaNativeCharset)).toBytesWritable(),
dimValueCount.toText()
);
}
}

View File

@ -34,15 +34,20 @@ import com.metamx.common.MapUtils;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import com.metamx.druid.RegisteringNode;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.serde.Registererer;
import com.metamx.druid.indexer.data.DataSpec;
import com.metamx.druid.indexer.data.StringInputRowParser;
import com.metamx.druid.indexer.data.TimestampSpec;
import com.metamx.druid.indexer.data.ToLowercaseDataSpec;
import com.metamx.druid.indexer.granularity.GranularitySpec;
import com.metamx.druid.indexer.granularity.UniformGranularitySpec;
import com.metamx.druid.indexer.partitions.PartitionsSpec;
import com.metamx.druid.indexer.path.PathSpec;
import com.metamx.druid.indexer.rollup.DataRollupSpec;
import com.metamx.druid.indexer.updater.UpdaterJobSpec;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.shard.ShardSpec;
import com.metamx.druid.utils.JodaUtils;
@ -50,6 +55,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
@ -60,8 +66,6 @@ import org.joda.time.format.ISODateTimeFormat;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
@ -162,8 +166,6 @@ public class HadoopDruidIndexerConfig
private static final String CONFIG_PROPERTY = "druid.indexer.config";
@Deprecated
private volatile List<Interval> intervals;
private volatile String dataSource;
private volatile String timestampColumnName;
private volatile String timestampFormat;
@ -175,8 +177,7 @@ public class HadoopDruidIndexerConfig
private volatile String jobOutputDir;
private volatile String segmentOutputDir;
private volatile DateTime version = new DateTime();
private volatile String partitionDimension;
private volatile Long targetPartitionSize;
private volatile PartitionsSpec partitionsSpec;
private volatile boolean leaveIntermediate = false;
private volatile boolean cleanupOnFailure = true;
private volatile Map<DateTime, List<HadoopyShardSpec>> shardSpecs = ImmutableMap.of();
@ -186,22 +187,97 @@ public class HadoopDruidIndexerConfig
private volatile boolean ignoreInvalidRows = false;
private volatile List<String> registererers = Lists.newArrayList();
@JsonCreator
public HadoopDruidIndexerConfig(
final @JsonProperty("intervals") List<Interval> intervals,
final @JsonProperty("dataSource") String dataSource,
final @JsonProperty("timestampColumnName") String timestampColumnName,
final @JsonProperty("timestampFormat") String timestampFormat,
final @JsonProperty("dataSpec") DataSpec dataSpec,
final @JsonProperty("segmentGranularity") Granularity segmentGranularity,
final @JsonProperty("granularitySpec") GranularitySpec granularitySpec,
final @JsonProperty("pathSpec") PathSpec pathSpec,
final @JsonProperty("jobOutputDir") String jobOutputDir,
final @JsonProperty("segmentOutputDir") String segmentOutputDir,
final @JsonProperty("version") DateTime version,
final @JsonProperty("partitionDimension") String partitionDimension,
final @JsonProperty("targetPartitionSize") Long targetPartitionSize,
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
final @JsonProperty("cleanupOnFailure") boolean cleanupOnFailure,
final @JsonProperty("shardSpecs") Map<DateTime, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec,
final @JsonProperty("updaterJobSpec") UpdaterJobSpec updaterJobSpec,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
final @JsonProperty("registererers") List<String> registererers
)
{
this.dataSource = dataSource;
this.timestampColumnName = timestampColumnName;
this.timestampFormat = timestampFormat;
this.dataSpec = dataSpec;
this.granularitySpec = granularitySpec;
this.pathSpec = pathSpec;
this.jobOutputDir = jobOutputDir;
this.segmentOutputDir = segmentOutputDir;
this.version = version;
this.partitionsSpec = partitionsSpec;
this.leaveIntermediate = leaveIntermediate;
this.cleanupOnFailure = cleanupOnFailure;
this.shardSpecs = shardSpecs;
this.overwriteFiles = overwriteFiles;
this.rollupSpec = rollupSpec;
this.updaterJobSpec = updaterJobSpec;
this.ignoreInvalidRows = ignoreInvalidRows;
this.registererers = registererers;
if(partitionsSpec != null) {
Preconditions.checkArgument(
partitionDimension == null && targetPartitionSize == null,
"Cannot mix partitionsSpec with partitionDimension/targetPartitionSize"
);
this.partitionsSpec = partitionsSpec;
} else {
// Backwards compatibility
this.partitionsSpec = new PartitionsSpec(partitionDimension, targetPartitionSize, false);
}
if(granularitySpec != null) {
Preconditions.checkArgument(
segmentGranularity == null && intervals == null,
"Cannot mix granularitySpec with segmentGranularity/intervals"
);
} else {
// Backwards compatibility
this.segmentGranularity = segmentGranularity;
if(segmentGranularity != null && intervals != null) {
this.granularitySpec = new UniformGranularitySpec(segmentGranularity, intervals);
}
}
}
/**
* Default constructor does nothing. The caller is expected to use the various setX methods.
*/
public HadoopDruidIndexerConfig()
{
}
public List<Interval> getIntervals()
{
return JodaUtils.condenseIntervals(getGranularitySpec().bucketIntervals());
}
@Deprecated
@JsonProperty
public void setIntervals(List<Interval> intervals)
{
Preconditions.checkState(this.granularitySpec == null, "Use setGranularitySpec");
Preconditions.checkState(this.granularitySpec == null, "Cannot mix setIntervals with granularitySpec");
Preconditions.checkState(this.segmentGranularity != null, "Cannot use setIntervals without segmentGranularity");
// For backwards compatibility
this.intervals = intervals;
if (this.segmentGranularity != null) {
this.granularitySpec = new UniformGranularitySpec(this.segmentGranularity, this.intervals);
}
this.granularitySpec = new UniformGranularitySpec(this.segmentGranularity, intervals);
}
@JsonProperty
@ -237,6 +313,11 @@ public class HadoopDruidIndexerConfig
this.timestampFormat = timestampFormat;
}
public TimestampSpec getTimestampSpec()
{
return new TimestampSpec(timestampColumnName, timestampFormat);
}
@JsonProperty
public DataSpec getDataSpec()
{
@ -248,17 +329,30 @@ public class HadoopDruidIndexerConfig
this.dataSpec = new ToLowercaseDataSpec(dataSpec);
}
@Deprecated
@JsonProperty
public void setSegmentGranularity(Granularity segmentGranularity)
public StringInputRowParser getParser()
{
Preconditions.checkState(this.granularitySpec == null, "Use setGranularitySpec");
final List<String> dimensionExclusions;
// For backwards compatibility
this.segmentGranularity = segmentGranularity;
if (this.intervals != null) {
this.granularitySpec = new UniformGranularitySpec(this.segmentGranularity, this.intervals);
if(getDataSpec().hasCustomDimensions()) {
dimensionExclusions = null;
} else {
dimensionExclusions = Lists.newArrayList();
dimensionExclusions.add(getTimestampColumnName());
dimensionExclusions.addAll(
Lists.transform(
getRollupSpec().getAggs(), new Function<AggregatorFactory, String>()
{
@Override
public String apply(AggregatorFactory aggregatorFactory)
{
return aggregatorFactory.getName();
}
}
)
);
}
return new StringInputRowParser(getTimestampSpec(), getDataSpec(), dimensionExclusions);
}
@JsonProperty
@ -269,15 +363,20 @@ public class HadoopDruidIndexerConfig
public void setGranularitySpec(GranularitySpec granularitySpec)
{
Preconditions.checkState(this.intervals == null, "Use setGranularitySpec instead of setIntervals");
Preconditions.checkState(
this.segmentGranularity == null,
"Use setGranularitySpec instead of setSegmentGranularity"
);
this.granularitySpec = granularitySpec;
}
@JsonProperty
public PartitionsSpec getPartitionsSpec()
{
return partitionsSpec;
}
public void setPartitionsSpec(PartitionsSpec partitionsSpec)
{
this.partitionsSpec = partitionsSpec;
}
@JsonProperty
public PathSpec getPathSpec()
{
@ -322,31 +421,19 @@ public class HadoopDruidIndexerConfig
this.version = version;
}
@JsonProperty
public String getPartitionDimension()
{
return partitionDimension;
}
public void setPartitionDimension(String partitionDimension)
{
this.partitionDimension = (partitionDimension == null) ? partitionDimension : partitionDimension;
return partitionsSpec.getPartitionDimension();
}
public boolean partitionByDimension()
{
return partitionDimension != null;
return partitionsSpec.isDeterminingPartitions();
}
@JsonProperty
public Long getTargetPartitionSize()
{
return targetPartitionSize;
}
public void setTargetPartitionSize(Long targetPartitionSize)
{
this.targetPartitionSize = targetPartitionSize;
return partitionsSpec.getTargetPartitionSize();
}
public boolean isUpdaterJobSpecSet()
@ -447,21 +534,15 @@ public class HadoopDruidIndexerConfig
********************************************/
/**
* Get the proper bucket for this "row"
* Get the proper bucket for some input row.
*
* @param theMap a Map that represents a "row", keys are column names, values are, well, values
* @param inputRow an InputRow
*
* @return the Bucket that this row belongs to
*/
public Optional<Bucket> getBucket(Map<String, String> theMap)
public Optional<Bucket> getBucket(InputRow inputRow)
{
final Optional<Interval> timeBucket = getGranularitySpec().bucketInterval(
new DateTime(
theMap.get(
getTimestampColumnName()
)
)
);
final Optional<Interval> timeBucket = getGranularitySpec().bucketInterval(new DateTime(inputRow.getTimestampFromEpoch()));
if (!timeBucket.isPresent()) {
return Optional.absent();
}
@ -473,7 +554,7 @@ public class HadoopDruidIndexerConfig
for (final HadoopyShardSpec hadoopyShardSpec : shards) {
final ShardSpec actualSpec = hadoopyShardSpec.getActualSpec();
if (actualSpec.isInChunk(theMap)) {
if (actualSpec.isInChunk(inputRow)) {
return Optional.of(
new Bucket(
hadoopyShardSpec.getShardNum(),
@ -484,7 +565,7 @@ public class HadoopDruidIndexerConfig
}
}
throw new ISE("row[%s] doesn't fit in any shard[%s]", theMap, shards);
throw new ISE("row[%s] doesn't fit in any shard[%s]", inputRow, shards);
}
public Set<Interval> getSegmentGranularIntervals()
@ -566,6 +647,11 @@ public class HadoopDruidIndexerConfig
return new Path(makeIntermediatePath(), "segmentDescriptorInfo");
}
public Path makeGroupedDataDir()
{
return new Path(makeIntermediatePath(), "groupedData");
}
public Path makeDescriptorInfoPath(DataSegment segment)
{
return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", "")));
@ -626,10 +712,5 @@ public class HadoopDruidIndexerConfig
final int nIntervals = getIntervals().size();
Preconditions.checkArgument(nIntervals > 0, "intervals.size()[%s] <= 0", nIntervals);
if (partitionByDimension()) {
Preconditions.checkNotNull(partitionDimension);
Preconditions.checkNotNull(targetPartitionSize);
}
}
}

View File

@ -0,0 +1,66 @@
package com.metamx.druid.indexer;
import com.metamx.common.RE;
import com.metamx.druid.indexer.data.StringInputRowParser;
import com.metamx.druid.input.InputRow;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.joda.time.DateTime;
import java.io.IOException;
public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<LongWritable, Text, KEYOUT, VALUEOUT>
{
private HadoopDruidIndexerConfig config;
private StringInputRowParser parser;
@Override
protected void setup(Context context)
throws IOException, InterruptedException
{
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
parser = config.getParser();
}
public HadoopDruidIndexerConfig getConfig()
{
return config;
}
public StringInputRowParser getParser()
{
return parser;
}
@Override
protected void map(
LongWritable key, Text value, Context context
) throws IOException, InterruptedException
{
try {
final InputRow inputRow;
try {
inputRow = parser.parse(value.toString());
}
catch (IllegalArgumentException e) {
if (config.isIgnoreInvalidRows()) {
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
return; // we're ignoring this invalid row
} else {
throw e;
}
}
if(config.getGranularitySpec().bucketInterval(new DateTime(inputRow.getTimestampFromEpoch())).isPresent()) {
innerMap(inputRow, value, context);
}
}
catch (RuntimeException e) {
throw new RE(e, "Failure on row[%s]", value);
}
}
abstract protected void innerMap(InputRow inputRow, Text text, Context context)
throws IOException, InterruptedException;
}

View File

@ -19,31 +19,25 @@
package com.metamx.druid.indexer;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.google.common.primitives.Longs;
import com.metamx.common.ISE;
import com.metamx.common.RE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.Parser;
import com.metamx.common.parsers.ParserUtils;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.indexer.data.StringInputRowParser;
import com.metamx.druid.indexer.rollup.DataRollupSpec;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.input.InputRow;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -53,13 +47,11 @@ import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
@ -68,7 +60,6 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
@ -78,7 +69,6 @@ import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
@ -127,7 +117,7 @@ public class IndexGeneratorJob implements Jobby
job.setMapperClass(IndexGeneratorMapper.class);
job.setMapOutputValueClass(Text.class);
SortableBytes.useSortableBytesAsKey(job);
SortableBytes.useSortableBytesAsMapOutputKey(job);
job.setNumReduceTasks(Iterables.size(config.getAllBuckets()));
job.setPartitionerClass(IndexGeneratorPartitioner.class);
@ -144,7 +134,7 @@ public class IndexGeneratorJob implements Jobby
job.setJarByClass(IndexGeneratorJob.class);
job.submit();
log.info("Job submitted, status available at %s", job.getTrackingURL());
log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL());
boolean success = job.waitForCompletion(true);
@ -159,75 +149,29 @@ public class IndexGeneratorJob implements Jobby
}
}
public static class IndexGeneratorMapper extends Mapper<LongWritable, Text, BytesWritable, Text>
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Text>
{
private HadoopDruidIndexerConfig config;
private Parser<String, Object> parser;
private Function<String, DateTime> timestampConverter;
@Override
protected void setup(Context context)
throws IOException, InterruptedException
{
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
parser = config.getDataSpec().getParser();
timestampConverter = ParserUtils.createTimestampParser(config.getTimestampFormat());
}
@Override
protected void map(
LongWritable key, Text value, Context context
protected void innerMap(
InputRow inputRow,
Text text,
Context context
) throws IOException, InterruptedException
{
// Group by bucket, sort by timestamp
final Optional<Bucket> bucket = getConfig().getBucket(inputRow);
try {
final Map<String, Object> values = parser.parse(value.toString());
final String tsStr = (String) values.get(config.getTimestampColumnName());
final DateTime timestamp;
try {
timestamp = timestampConverter.apply(tsStr);
}
catch (IllegalArgumentException e) {
if (config.isIgnoreInvalidRows()) {
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
return; // we're ignoring this invalid row
} else {
throw e;
}
}
Optional<Bucket> bucket = config.getBucket(
Maps.transformEntries(
values,
new Maps.EntryTransformer<String, Object, String>()
{
@Override
public String transformEntry(@Nullable String key, @Nullable Object value)
{
if (key.equalsIgnoreCase(config.getTimestampColumnName())) {
return timestamp.toString();
}
return value.toString();
}
}
)
);
if (bucket.isPresent()) {
// Group by bucket, sort by timestamp
context.write(
new SortableBytes(
bucket.get().toGroupKey(),
Longs.toByteArray(timestamp.getMillis())
).toBytesWritable(),
value
);
}
}
catch (RuntimeException e) {
throw new RE(e, "Failure on row[%s]", value);
if(!bucket.isPresent()) {
throw new ISE("WTF?! No bucket found for row: %s", inputRow);
}
context.write(
new SortableBytes(
bucket.get().toGroupKey(),
Longs.toByteArray(inputRow.getTimestampFromEpoch())
).toBytesWritable(),
text
);
}
}
@ -253,8 +197,7 @@ public class IndexGeneratorJob implements Jobby
{
private HadoopDruidIndexerConfig config;
private List<String> metricNames = Lists.newArrayList();
private Function<String, DateTime> timestampConverter;
private Parser parser;
private StringInputRowParser parser;
@Override
protected void setup(Context context)
@ -265,8 +208,8 @@ public class IndexGeneratorJob implements Jobby
for (AggregatorFactory factory : config.getRollupSpec().getAggs()) {
metricNames.add(factory.getName().toLowerCase());
}
timestampConverter = ParserUtils.createTimestampParser(config.getTimestampFormat());
parser = config.getDataSpec().getParser();
parser = config.getParser();
}
@Override
@ -299,32 +242,10 @@ public class IndexGeneratorJob implements Jobby
for (final Text value : values) {
context.progress();
Map<String, Object> event = parser.parse(value.toString());
final long timestamp = timestampConverter.apply((String) event.get(config.getTimestampColumnName()))
.getMillis();
List<String> dimensionNames =
config.getDataSpec().hasCustomDimensions() ?
config.getDataSpec().getDimensions() :
Lists.newArrayList(
FunctionalIterable.create(event.keySet())
.filter(
new Predicate<String>()
{
@Override
public boolean apply(@Nullable String input)
{
return !(metricNames.contains(input.toLowerCase())
|| config.getTimestampColumnName()
.equalsIgnoreCase(input));
}
}
)
);
allDimensionNames.addAll(dimensionNames);
final InputRow inputRow = parser.parse(value.toString());
allDimensionNames.addAll(inputRow.getDimensions());
int numRows = index.add(
new MapBasedInputRow(timestamp, dimensionNames, event)
);
int numRows = index.add(inputRow);
++lineCount;
if (numRows >= rollupSpec.rowFlushBoundary) {

View File

@ -102,7 +102,7 @@ public class SortableBytes
);
}
public static void useSortableBytesAsKey(Job job)
public static void useSortableBytesAsMapOutputKey(Job job)
{
job.setMapOutputKeyClass(BytesWritable.class);
job.setGroupingComparatorClass(SortableBytesGroupingComparator.class);

View File

@ -20,6 +20,9 @@
package com.metamx.druid.indexer.granularity;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.Granularity;
import com.metamx.common.guava.Comparators;
@ -35,47 +38,47 @@ import java.util.TreeSet;
public class UniformGranularitySpec implements GranularitySpec
{
final private Granularity granularity;
final private List<Interval> intervals;
final private List<Interval> inputIntervals;
final private ArbitraryGranularitySpec wrappedSpec;
@JsonCreator
public UniformGranularitySpec(
@JsonProperty("gran") Granularity granularity,
@JsonProperty("intervals") List<Interval> intervals
@JsonProperty("intervals") List<Interval> inputIntervals
)
{
List<Interval> granularIntervals = Lists.newArrayList();
for (Interval inputInterval : inputIntervals) {
Iterables.addAll(granularIntervals, granularity.getIterable(inputInterval));
}
this.granularity = granularity;
this.intervals = intervals;
this.inputIntervals = ImmutableList.copyOf(inputIntervals);
this.wrappedSpec = new ArbitraryGranularitySpec(granularIntervals);
}
@Override
public SortedSet<Interval> bucketIntervals()
{
final TreeSet<Interval> retVal = Sets.newTreeSet(Comparators.intervals());
for (Interval interval : intervals) {
for (Interval segmentInterval : granularity.getIterable(interval)) {
retVal.add(segmentInterval);
}
}
return retVal;
return wrappedSpec.bucketIntervals();
}
@Override
public Optional<Interval> bucketInterval(DateTime dt)
{
return Optional.of(granularity.bucket(dt));
return wrappedSpec.bucketInterval(dt);
}
@JsonProperty
@JsonProperty("gran")
public Granularity getGranularity()
{
return granularity;
}
@JsonProperty
@JsonProperty("intervals")
public Iterable<Interval> getIntervals()
{
return intervals;
return inputIntervals;
}
}

View File

@ -0,0 +1,52 @@
package com.metamx.druid.indexer.partitions;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonProperty;
import javax.annotation.Nullable;
public class PartitionsSpec
{
@Nullable
private final String partitionDimension;
private final long targetPartitionSize;
private final boolean assumeGrouped;
public PartitionsSpec(
@JsonProperty("partitionDimension") @Nullable String partitionDimension,
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
)
{
this.partitionDimension = partitionDimension;
this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize;
this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped;
}
@JsonIgnore
public boolean isDeterminingPartitions()
{
return targetPartitionSize > 0;
}
@JsonProperty
@Nullable
public String getPartitionDimension()
{
return partitionDimension;
}
@JsonProperty
public long getTargetPartitionSize()
{
return targetPartitionSize;
}
@JsonProperty
public boolean isAssumeGrouped()
{
return assumeGrouped;
}
}

View File

@ -22,6 +22,7 @@ package com.metamx.druid.indexer;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.metamx.druid.indexer.granularity.UniformGranularitySpec;
import com.metamx.druid.indexer.partitions.PartitionsSpec;
import com.metamx.druid.jackson.DefaultObjectMapper;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.Interval;
@ -67,7 +68,7 @@ public class HadoopDruidIndexerConfigTest
}
@Test
public void testIntervalsAndSegmentGranularity() {
public void testGranularitySpecLegacy() {
// Deprecated and replaced by granularitySpec, but still supported
final HadoopDruidIndexerConfig cfg;
@ -98,9 +99,8 @@ public class HadoopDruidIndexerConfigTest
);
}
@Test
public void testCmdlineAndSegmentGranularity() {
public void testGranularitySpecPostConstructorIntervals() {
// Deprecated and replaced by granularitySpec, but still supported
final HadoopDruidIndexerConfig cfg;
@ -133,7 +133,7 @@ public class HadoopDruidIndexerConfigTest
}
@Test
public void testInvalidCombination() {
public void testInvalidGranularityCombination() {
boolean thrown = false;
try {
final HadoopDruidIndexerConfig cfg = jsonMapper.readValue(
@ -154,4 +154,160 @@ public class HadoopDruidIndexerConfigTest
Assert.assertTrue("Exception thrown", thrown);
}
@Test
public void testPartitionsSpecNoPartitioning() {
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonMapper.readValue(
"{}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
false
);
}
@Test
public void testPartitionsSpecAutoDimension() {
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonMapper.readValue(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100"
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getPartitionDimension",
partitionsSpec.getPartitionDimension(),
null
);
}
@Test
public void testPartitionsSpecSpecificDimension() {
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonMapper.readValue(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100,"
+ " \"partitionDimension\":\"foo\""
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getPartitionDimension",
partitionsSpec.getPartitionDimension(),
"foo"
);
}
@Test
public void testPartitionsSpecLegacy() {
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonMapper.readValue(
"{"
+ "\"targetPartitionSize\":100,"
+ "\"partitionDimension\":\"foo\""
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getPartitionDimension",
partitionsSpec.getPartitionDimension(),
"foo"
);
}
@Test
public void testInvalidPartitionsCombination() {
boolean thrown = false;
try {
final HadoopDruidIndexerConfig cfg = jsonMapper.readValue(
"{"
+ "\"targetPartitionSize\":100,"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100"
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
thrown = true;
}
Assert.assertTrue("Exception thrown", thrown);
}
}

View File

@ -69,6 +69,12 @@ public class ArbitraryGranularityTest
spec.bucketInterval(new DateTime("2012-01-03T01Z"))
);
Assert.assertEquals(
"2012-01-04T01Z",
Optional.<Interval>absent(),
spec.bucketInterval(new DateTime("2012-01-04T01Z"))
);
Assert.assertEquals(
"2012-01-07T23:59:59.999Z",
Optional.of(new Interval("2012-01-07T00Z/2012-01-08T00Z")),

View File

@ -72,6 +72,12 @@ public class UniformGranularityTest
spec.bucketInterval(new DateTime("2012-01-03T01Z"))
);
Assert.assertEquals(
"2012-01-04T01Z",
Optional.<Interval>absent(),
spec.bucketInterval(new DateTime("2012-01-04T01Z"))
);
Assert.assertEquals(
"2012-01-07T23:59:59.999Z",
Optional.of(new Interval("2012-01-07T00Z/2012-01-08T00Z")),