1) Eliminate the need for SingleSegmentLoader

2) Setup the configuration glue for ServerInventoryViews and DataSegmentAnnouncer
3) Make processes run and work again!
This commit is contained in:
cheddar 2013-08-02 17:05:01 -07:00
parent f593b23aac
commit d66af7625c
24 changed files with 548 additions and 253 deletions

View File

@ -42,7 +42,6 @@ import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.client.SingleServerInventoryView;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.coordination.AbstractDataSegmentAnnouncer;
import com.metamx.druid.coordination.BatchDataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.coordination.DruidServerMetadata;
@ -53,9 +52,9 @@ import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.guice.JsonConfigurator;
import com.metamx.druid.http.log.NoopRequestLogger;
import com.metamx.druid.http.log.RequestLogger;
import com.metamx.druid.initialization.BatchDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
@ -456,23 +455,25 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
final Announcer announcer = new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s"));
lifecycle.addManagedInstance(announcer);
final ZkDataSegmentAnnouncerConfig config = getConfigFactory().build(ZkDataSegmentAnnouncerConfig.class);
final String announcerType = config.getAnnouncerType();
final BatchDataSegmentAnnouncerConfig config = getConfigFactory().build(BatchDataSegmentAnnouncerConfig.class);
final String announcerType = "legacy";
final DataSegmentAnnouncer dataSegmentAnnouncer;
if ("batch".equalsIgnoreCase(announcerType)) {
dataSegmentAnnouncer = new BatchDataSegmentAnnouncer(
getDruidServerMetadata(),
config,
getZkPaths(),
announcer,
getJsonMapper()
);
} else if ("legacy".equalsIgnoreCase(announcerType)) {
dataSegmentAnnouncer = new MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
Arrays.<AbstractDataSegmentAnnouncer>asList(
Arrays.<DataSegmentAnnouncer>asList(
new BatchDataSegmentAnnouncer(
getDruidServerMetadata(),
config,
getZkPaths(),
announcer,
getJsonMapper()
),

View File

@ -51,7 +51,8 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
{
super(
log,
zkPaths,
zkPaths.getAnnouncementsPath(),
zkPaths.getLiveSegmentsPath(),
curator,
jsonMapper,
new TypeReference<Set<DataSegment>>(){}

View File

@ -0,0 +1,31 @@
package com.metamx.druid.client;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
import javax.validation.constraints.NotNull;
/**
*/
public class BatchServerInventoryViewProvider implements ServerInventoryViewProvider
{
@JacksonInject
@NotNull
private ZkPathsConfig zkPaths = null;
@JacksonInject
@NotNull
private CuratorFramework curator = null;
@JacksonInject
@NotNull
private ObjectMapper jsonMapper = null;
@Override
public ServerInventoryView get()
{
return new BatchServerInventoryView(zkPaths, curator, jsonMapper);
}
}

View File

@ -31,12 +31,10 @@ import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.curator.inventory.CuratorInventoryManager;
import com.metamx.druid.curator.inventory.CuratorInventoryManagerStrategy;
import com.metamx.druid.curator.inventory.InventoryManagerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.emitter.EmittingLogger;
import org.apache.curator.framework.CuratorFramework;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
@ -54,11 +52,10 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
private final ConcurrentMap<ServerCallback, Executor> serverCallbacks = new MapMaker().makeMap();
private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new MapMaker().makeMap();
private final Map<String, Integer> removedSegments = new MapMaker().makeMap();
public ServerInventoryView(
final EmittingLogger log,
final ZkPathsConfig zkPaths,
final String announcementsPath,
final String inventoryPath,
final CuratorFramework curator,
final ObjectMapper jsonMapper,
final TypeReference<InventoryType> typeReference
@ -72,13 +69,13 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
@Override
public String getContainerPath()
{
return zkPaths.getAnnouncementsPath();
return announcementsPath;
}
@Override
public String getInventoryPath()
{
return zkPaths.getServedSegmentsPath();
return inventoryPath;
}
},
Execs.singleThreaded("ServerInventoryView-%s"),
@ -174,26 +171,6 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
);
}
public int lookupSegmentLifetime(DataSegment segment)
{
Integer lifetime = removedSegments.get(segment.getIdentifier());
return (lifetime == null) ? 0 : lifetime;
}
public void decrementRemovedSegmentsLifetime()
{
for (Iterator<Map.Entry<String, Integer>> mapIter = removedSegments.entrySet().iterator(); mapIter.hasNext(); ) {
Map.Entry<String, Integer> segment = mapIter.next();
int lifetime = segment.getValue() - 1;
if (lifetime < 0) {
mapIter.remove();
} else {
segment.setValue(lifetime);
}
}
}
@LifecycleStart
public void start() throws Exception
{
@ -292,7 +269,6 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
final DataSegment inventory
)
{
/* TODO
log.info("Server[%s] added segment[%s]", container.getName(), inventory.getIdentifier());
if (container.getSegment(inventory.getIdentifier()) != null) {
@ -317,12 +293,10 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
}
}
);
*/
}
protected void removeSingleInventory(final DruidServer container, String inventoryKey)
{
/* TODO
log.info("Server[%s] removed segment[%s]", container.getName(), inventoryKey);
final DataSegment segment = container.getSegment(inventoryKey);
@ -348,9 +322,6 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
}
}
);
removedSegments.put(inventoryKey, config.getRemovedSegmentLifetime());
*/
}
protected abstract DruidServer addInnerInventory(

View File

@ -0,0 +1,16 @@
package com.metamx.druid.client;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.inject.Provider;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SingleServerInventoryProvider.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "legacy", value = SingleServerInventoryProvider.class),
@JsonSubTypes.Type(name = "batch", value = BatchServerInventoryViewProvider.class)
})
public interface ServerInventoryViewProvider extends Provider<ServerInventoryView>
{
}

View File

@ -0,0 +1,31 @@
package com.metamx.druid.client;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
import javax.validation.constraints.NotNull;
/**
*/
public class SingleServerInventoryProvider implements ServerInventoryViewProvider
{
@JacksonInject
@NotNull
private ZkPathsConfig zkPaths = null;
@JacksonInject
@NotNull
private CuratorFramework curator = null;
@JacksonInject
@NotNull
private ObjectMapper jsonMapper = null;
@Override
public ServerInventoryView get()
{
return new SingleServerInventoryView(zkPaths, curator, jsonMapper);
}
}

View File

@ -43,7 +43,8 @@ public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
{
super(
log,
zkPaths,
zkPaths.getAnnouncementsPath(),
zkPaths.getServedSegmentsPath(),
curator,
jsonMapper,
new TypeReference<DataSegment>(){}

View File

@ -24,11 +24,13 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.BatchDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import org.apache.curator.utils.ZKPaths;
import org.joda.time.DateTime;
@ -43,7 +45,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
{
private static final Logger log = new Logger(BatchDataSegmentAnnouncer.class);
private final ZkDataSegmentAnnouncerConfig config;
private final BatchDataSegmentAnnouncerConfig config;
private final Announcer announcer;
private final ObjectMapper jsonMapper;
private final String liveSegmentLocation;
@ -51,27 +53,29 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
private final Set<SegmentZNode> availableZNodes = Sets.newHashSet();
private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newHashMap();
@Inject
public BatchDataSegmentAnnouncer(
DruidServerMetadata server,
ZkDataSegmentAnnouncerConfig config,
BatchDataSegmentAnnouncerConfig config,
ZkPathsConfig zkPaths,
Announcer announcer,
ObjectMapper jsonMapper
)
{
super(server, config, announcer, jsonMapper);
super(server, zkPaths, announcer, jsonMapper);
this.config = config;
this.announcer = announcer;
this.jsonMapper = jsonMapper;
this.liveSegmentLocation = ZKPaths.makePath(config.getLiveSegmentsPath(), server.getName());
this.liveSegmentLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), server.getName());
}
@Override
public void announceSegment(DataSegment segment) throws IOException
{
int newBytesLen = jsonMapper.writeValueAsBytes(segment).length;
if (newBytesLen > config.getMaxNumBytes()) {
throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxNumBytes());
if (newBytesLen > config.getMaxBytesPerNode()) {
throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxBytesPerNode());
}
// create new batch
@ -88,7 +92,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
boolean done = false;
while (iter.hasNext() && !done) {
SegmentZNode availableZNode = iter.next();
if (availableZNode.getBytes().length + newBytesLen < config.getMaxNumBytes()) {
if (availableZNode.getBytes().length + newBytesLen < config.getMaxBytesPerNode()) {
availableZNode.addSegment(segment);
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
@ -132,11 +136,11 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
for (DataSegment segment : segments) {
int newBytesLen = jsonMapper.writeValueAsBytes(segment).length;
if (newBytesLen > config.getMaxNumBytes()) {
throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxNumBytes());
if (newBytesLen > config.getMaxBytesPerNode()) {
throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxBytesPerNode());
}
if (count >= config.getSegmentsPerNode() || byteSize + newBytesLen > config.getMaxNumBytes()) {
if (count >= config.getSegmentsPerNode() || byteSize + newBytesLen > config.getMaxBytesPerNode()) {
segmentZNode.addSegments(batch);
announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());

View File

@ -0,0 +1,20 @@
package com.metamx.druid.coordination;
import com.fasterxml.jackson.annotation.JacksonInject;
import javax.validation.constraints.NotNull;
/**
*/
public class BatchDataSegmentAnnouncerProvider implements DataSegmentAnnouncerProvider
{
@JacksonInject
@NotNull
private BatchDataSegmentAnnouncer batchAnnouncer = null;
@Override
public DataSegmentAnnouncer get()
{
return batchAnnouncer;
}
}

View File

@ -0,0 +1,16 @@
package com.metamx.druid.coordination;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.inject.Provider;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = LegacyDataSegmentAnnouncerProvider.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "legacy", value = LegacyDataSegmentAnnouncerProvider.class),
@JsonSubTypes.Type(name = "batch", value = BatchDataSegmentAnnouncerProvider.class)
})
public interface DataSegmentAnnouncerProvider extends Provider<DataSegmentAnnouncer>
{
}

View File

@ -0,0 +1,32 @@
package com.metamx.druid.coordination;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.metamx.common.lifecycle.Lifecycle;
import javax.validation.constraints.NotNull;
import java.util.Arrays;
/**
*/
public class LegacyDataSegmentAnnouncerProvider implements DataSegmentAnnouncerProvider
{
@JacksonInject
@NotNull
private SingleDataSegmentAnnouncer singleAnnouncer = null;
@JacksonInject
@NotNull
private BatchDataSegmentAnnouncer batchAnnouncer = null;
@JacksonInject
@NotNull
private Lifecycle lifecycle = null;
@Override
public DataSegmentAnnouncer get()
{
return new MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
Arrays.<DataSegmentAnnouncer>asList(singleAnnouncer, batchAnnouncer)
);
}
}

View File

@ -19,8 +19,6 @@
package com.metamx.druid.coordination;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.client.DataSegment;
import java.io.IOException;
@ -30,31 +28,15 @@ import java.io.IOException;
*/
public class MultipleDataSegmentAnnouncerDataSegmentAnnouncer implements DataSegmentAnnouncer
{
private final Iterable<AbstractDataSegmentAnnouncer> dataSegmentAnnouncers;
private final Iterable<DataSegmentAnnouncer> dataSegmentAnnouncers;
public MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
Iterable<AbstractDataSegmentAnnouncer> dataSegmentAnnouncers
Iterable<DataSegmentAnnouncer> dataSegmentAnnouncers
)
{
this.dataSegmentAnnouncers = dataSegmentAnnouncers;
}
@LifecycleStart
public void start()
{
for (AbstractDataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
dataSegmentAnnouncer.start();
}
}
@LifecycleStop
public void stop()
{
for (AbstractDataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
dataSegmentAnnouncer.stop();
}
}
@Override
public void announceSegment(DataSegment segment) throws IOException
{

View File

@ -0,0 +1,30 @@
package com.metamx.druid.initialization;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
/**
*/
public class BatchDataSegmentAnnouncerConfig
{
@JsonProperty
@Min(1)
private int segmentsPerNode = 50;
@JsonProperty
@Max(1024 * 1024)
@Min(1024)
private long maxBytesPerNode = 512 * 1024;
public int getSegmentsPerNode()
{
return segmentsPerNode;
}
public long getMaxBytesPerNode()
{
return maxBytesPerNode;
}
}

View File

@ -1,21 +0,0 @@
package com.metamx.druid.initialization;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/
public abstract class ZkDataSegmentAnnouncerConfig extends ZkPathsConfig
{
@Config("druid.zk.segmentsPerNode")
@Default("50")
public abstract int getSegmentsPerNode();
@Config("druid.zk.maxNumBytesPerNode")
@Default("512000")
public abstract long getMaxNumBytes();
@Config("druid.announcer.type")
@Default("legacy")
public abstract String getAnnouncerType();
}

View File

@ -30,7 +30,7 @@ import com.metamx.druid.coordination.BatchDataSegmentAnnouncer;
import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.BatchDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import junit.framework.Assert;
@ -92,30 +92,20 @@ public class BatchServerInventoryViewTest
"type",
"tier"
),
new ZkDataSegmentAnnouncerConfig()
new BatchDataSegmentAnnouncerConfig()
{
@Override
public String getZkBasePath()
{
return testBasePath;
}
@Override
public int getSegmentsPerNode()
{
return 50;
}
},
new ZkPathsConfig()
{
@Override
public long getMaxNumBytes()
public String getZkBasePath()
{
return 100000;
}
@Override
public String getAnnouncerType()
{
return "batch";
return testBasePath;
}
},
announcer,

View File

@ -28,7 +28,8 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.BatchDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import junit.framework.Assert;
import org.apache.curator.framework.CuratorFramework;
@ -92,30 +93,20 @@ public class BatchDataSegmentAnnouncerTest
"type",
"tier"
),
new ZkDataSegmentAnnouncerConfig()
new BatchDataSegmentAnnouncerConfig()
{
@Override
public String getZkBasePath()
{
return testBasePath;
}
@Override
public int getSegmentsPerNode()
{
return 50;
}
},
new ZkPathsConfig()
{
@Override
public long getMaxNumBytes()
public String getZkBasePath()
{
return 100000;
}
@Override
public String getAnnouncerType()
{
return "batch";
return testBasePath;
}
},
announcer,

View File

@ -1,11 +1,11 @@
package com.metamx.druid.guice;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotatedField;
import com.fasterxml.jackson.databind.introspect.BeanPropertyDefinition;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
@ -35,8 +35,6 @@ public class JsonConfigurator
{
private static final Logger log = new Logger(JsonConfigurator.class);
private static final Joiner JOINER = Joiner.on(".");
private final ObjectMapper jsonMapper;
private final Validator validator;
@ -86,7 +84,7 @@ public class JsonConfigurator
List<String> messages = Lists.newArrayList();
for (ConstraintViolation<T> violation : violations) {
List<String> pathParts = Lists.newArrayList();
String path = "";
try {
Class<?> beanClazz = violation.getRootBeanClass();
final Iterator<Path.Node> iter = violation.getPropertyPath().iterator();
@ -95,8 +93,21 @@ public class JsonConfigurator
if (next.getKind() == ElementKind.PROPERTY) {
final String fieldName = next.getName();
final Field theField = beanClazz.getDeclaredField(fieldName);
if (theField.getAnnotation(JacksonInject.class) != null) {
path = String.format(" -- Injected field[%s] not bound!?", fieldName);
break;
}
JsonProperty annotation = theField.getAnnotation(JsonProperty.class);
pathParts.add(annotation == null || Strings.isNullOrEmpty(annotation.value()) ? fieldName : annotation.value());
final boolean noAnnotationValue = annotation == null || Strings.isNullOrEmpty(annotation.value());
final String pathPart = noAnnotationValue ? fieldName : annotation.value();
if (path.isEmpty()) {
path += pathPart;
}
else {
path += "." + pathPart;
}
}
}
}
@ -104,7 +115,7 @@ public class JsonConfigurator
throw Throwables.propagate(e);
}
messages.add(String.format("%s - %s", JOINER.join(pathParts), violation.getMessage()));
messages.add(String.format("%s - %s", path, violation.getMessage()));
}
throw new ProvisionException(

View File

@ -9,6 +9,7 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.druid.client.InventoryView;
import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.client.ServerInventoryViewProvider;
import com.metamx.druid.client.indexing.IndexingService;
import com.metamx.druid.client.indexing.IndexingServiceClient;
import com.metamx.druid.client.indexing.IndexingServiceSelector;
@ -48,10 +49,12 @@ public class CoordinatorModule implements Module
JsonConfigProvider.bind(binder, "druid.manager.rules", DatabaseRuleManagerConfig.class);
binder.bind(InventoryView.class).to(ServerInventoryView.class);
binder.bind(ServerInventoryView.class);
binder.bind(RedirectServlet.class).in(LazySingleton.class);
binder.bind(RedirectFilter.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.announcer", ServerInventoryViewProvider.class);
binder.bind(ServerInventoryView.class).toProvider(ServerInventoryViewProvider.class).in(ManageLifecycle.class);
binder.bind(DatabaseSegmentManager.class)
.toProvider(DatabaseSegmentManagerProvider.class)
.in(ManageLifecycle.class);

View File

@ -15,21 +15,24 @@ import com.metamx.druid.Query;
import com.metamx.druid.client.DruidServerConfig;
import com.metamx.druid.collect.StupidPool;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.coordination.BatchDataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncerProvider;
import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer;
import com.metamx.druid.coordination.ServerManager;
import com.metamx.druid.coordination.SingleDataSegmentAnnouncer;
import com.metamx.druid.coordination.ZkCoordinator;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.guice.annotations.Global;
import com.metamx.druid.guice.annotations.Processing;
import com.metamx.druid.guice.annotations.Self;
import com.metamx.druid.initialization.BatchDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.DruidNode;
import com.metamx.druid.loading.BaseSegmentLoader;
import com.metamx.druid.loading.DataSegmentPuller;
import com.metamx.druid.loading.HdfsDataSegmentPuller;
import com.metamx.druid.loading.LocalDataSegmentPuller;
import com.metamx.druid.loading.MMappedQueryableIndexFactory;
import com.metamx.druid.loading.OmniSegmentLoader;
import com.metamx.druid.loading.QueryableIndexFactory;
import com.metamx.druid.loading.S3CredentialsConfig;
import com.metamx.druid.loading.S3DataSegmentPuller;
@ -84,7 +87,7 @@ public class HistoricalModule implements Module
binder.bind(ServerManager.class).in(LazySingleton.class);
binder.bind(SegmentLoader.class).to(BaseSegmentLoader.class).in(LazySingleton.class);
binder.bind(SegmentLoader.class).to(OmniSegmentLoader.class).in(LazySingleton.class);
binder.bind(QueryableIndexFactory.class).to(MMappedQueryableIndexFactory.class).in(LazySingleton.class);
final MapBinder<String, DataSegmentPuller> segmentPullerBinder = MapBinder.newMapBinder(
@ -120,36 +123,11 @@ public class HistoricalModule implements Module
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
binder.bind(DataSegmentAnnouncer.class)
.to(MultipleDataSegmentAnnouncerDataSegmentAnnouncer.class)
.in(ManageLifecycleLast.class);
}
private void bindDeepStorageS3(Binder binder)
{
final MapBinder<String, DataSegmentPuller> segmentPullerBinder = MapBinder.newMapBinder(
binder, String.class, DataSegmentPuller.class
);
segmentPullerBinder.addBinding("s3_zip").to(S3DataSegmentPuller.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.s3", S3CredentialsConfig.class);
}
private void bindDeepStorageHdfs(Binder binder)
{
final MapBinder<String, DataSegmentPuller> segmentPullerBinder = MapBinder.newMapBinder(
binder, String.class, DataSegmentPuller.class
);
segmentPullerBinder.addBinding("hdfs").to(HdfsDataSegmentPuller.class).in(LazySingleton.class);
binder.bind(Configuration.class).toInstance(new Configuration());
}
private void bindDeepStorageCassandra(Binder binder)
{
final MapBinder<String, DataSegmentPuller> segmentPullerBinder = MapBinder.newMapBinder(
binder, String.class, DataSegmentPuller.class
);
segmentPullerBinder.addBinding("c*").to(CassandraDataSegmentPuller.class).in(LazySingleton.class);
ConfigProvider.bind(binder, CassandraDataSegmentConfig.class);
JsonConfigProvider.bind(binder, "druid.announcer", BatchDataSegmentAnnouncerConfig.class);
JsonConfigProvider.bind(binder, "druid.announcer", DataSegmentAnnouncerProvider.class);
binder.bind(DataSegmentAnnouncer.class).toProvider(DataSegmentAnnouncerProvider.class);
binder.bind(BatchDataSegmentAnnouncer.class).in(ManageLifecycleLast.class);
binder.bind(SingleDataSegmentAnnouncer.class).in(ManageLifecycleLast.class);
}
@Provides @LazySingleton
@ -225,6 +203,33 @@ public class HistoricalModule implements Module
return new IntermediateProcessingBufferPool(config.intermediateComputeSizeBytes());
}
private static void bindDeepStorageS3(Binder binder)
{
final MapBinder<String, DataSegmentPuller> segmentPullerBinder = MapBinder.newMapBinder(
binder, String.class, DataSegmentPuller.class
);
segmentPullerBinder.addBinding("s3_zip").to(S3DataSegmentPuller.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.s3", S3CredentialsConfig.class);
}
private static void bindDeepStorageHdfs(Binder binder)
{
final MapBinder<String, DataSegmentPuller> segmentPullerBinder = MapBinder.newMapBinder(
binder, String.class, DataSegmentPuller.class
);
segmentPullerBinder.addBinding("hdfs").to(HdfsDataSegmentPuller.class).in(LazySingleton.class);
binder.bind(Configuration.class).toInstance(new Configuration());
}
private static void bindDeepStorageCassandra(Binder binder)
{
final MapBinder<String, DataSegmentPuller> segmentPullerBinder = MapBinder.newMapBinder(
binder, String.class, DataSegmentPuller.class
);
segmentPullerBinder.addBinding("c*").to(CassandraDataSegmentPuller.class).in(LazySingleton.class);
ConfigProvider.bind(binder, CassandraDataSegmentConfig.class);
}
private static class IntermediateProcessingBufferPool extends StupidPool<ByteBuffer>
{
private static final Logger log = new Logger(IntermediateProcessingBufferPool.class);

View File

@ -1,62 +0,0 @@
package com.metamx.druid.loading;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.Segment;
import java.util.Map;
/**
*/
public class BaseSegmentLoader implements SegmentLoader
{
private final Map<String, DataSegmentPuller> pullers;
private final QueryableIndexFactory factory;
private final Supplier<SegmentLoaderConfig> config;
@Inject
public BaseSegmentLoader(
Map<String, DataSegmentPuller> pullers,
QueryableIndexFactory factory,
Supplier<SegmentLoaderConfig> config
)
{
this.pullers = pullers;
this.factory = factory;
this.config = config;
}
@Override
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException
{
return getLoader(segment.getLoadSpec()).isSegmentLoaded(segment);
}
@Override
public Segment getSegment(DataSegment segment) throws SegmentLoadingException
{
return getLoader(segment.getLoadSpec()).getSegment(segment);
}
@Override
public void cleanup(DataSegment segment) throws SegmentLoadingException
{
getLoader(segment.getLoadSpec()).cleanup(segment);
}
private SegmentLoader getLoader(Map<String, Object> loadSpec) throws SegmentLoadingException
{
String type = MapUtils.getString(loadSpec, "type");
DataSegmentPuller loader = pullers.get(type);
if (loader == null) {
throw new SegmentLoadingException("Unknown loader type[%s]. Known types are %s", type, pullers.keySet());
}
// TODO: SingleSegmentLoader should die when Guice goes out. The logic should just be in this class.
return new SingleSegmentLoader(loader, factory, config.get());
}
}

View File

@ -0,0 +1,250 @@
package com.metamx.druid.loading;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import com.google.inject.Inject;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.QueryableIndexSegment;
import com.metamx.druid.index.Segment;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
*/
public class OmniSegmentLoader implements SegmentLoader
{
private static final Logger log = new Logger(OmniSegmentLoader.class);
private final Map<String, DataSegmentPuller> pullers;
private final QueryableIndexFactory factory;
private final SegmentLoaderConfig config;
private final List<StorageLocation> locations;
@Inject
public OmniSegmentLoader(
Map<String, DataSegmentPuller> pullers,
QueryableIndexFactory factory,
SegmentLoaderConfig config
)
{
this.pullers = pullers;
this.factory = factory;
this.config = config;
final ImmutableList.Builder<StorageLocation> locBuilder = ImmutableList.builder();
// TODO
// This is a really, really stupid way of getting this information. Splitting on commas and bars is error-prone
// We should instead switch it up to be a JSON Array of JSON Object or something and cool stuff like that
// But, that'll have to wait for some other day.
for (String dirSpec : config.getLocations().split(",")) {
String[] dirSplit = dirSpec.split("\\|");
if (dirSplit.length == 1) {
locBuilder.add(new StorageLocation(new File(dirSplit[0]), Integer.MAX_VALUE));
}
else if (dirSplit.length == 2) {
final Long maxSize = Longs.tryParse(dirSplit[1]);
if (maxSize == null) {
throw new IAE("Size of a local segment storage location must be an integral number, got[%s]", dirSplit[1]);
}
locBuilder.add(new StorageLocation(new File(dirSplit[0]), maxSize));
}
else {
throw new ISE(
"Unknown segment storage location[%s]=>[%s], config[%s].",
dirSplit.length, dirSpec, config.getLocations()
);
}
}
locations = locBuilder.build();
Preconditions.checkArgument(locations.size() > 0, "Must have at least one segment cache directory.");
log.info("Using storage locations[%s]", locations);
}
@Override
public boolean isSegmentLoaded(final DataSegment segment)
{
return findStorageLocationIfLoaded(segment) != null;
}
public StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
{
for (StorageLocation location : locations) {
File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
if (localStorageDir.exists()) {
return location;
}
}
return null;
}
@Override
public Segment getSegment(DataSegment segment) throws SegmentLoadingException
{
File segmentFiles = loadSegmentFiles(segment);
final QueryableIndex index = factory.factorize(segmentFiles);
return new QueryableIndexSegment(segment.getIdentifier(), index);
}
public File loadSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
StorageLocation loc = findStorageLocationIfLoaded(segment);
final File retVal;
if (loc == null) {
Iterator<StorageLocation> locIter = locations.iterator();
loc = locIter.next();
while (locIter.hasNext()) {
loc = loc.mostEmpty(locIter.next());
}
if (!loc.canHandle(segment.getSize())) {
throw new ISE(
"Segment[%s:%,d] too large for storage[%s:%,d].",
segment.getIdentifier(), segment.getSize(), loc.getPath(), loc.available()
);
}
File storageDir = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
if (!storageDir.mkdirs()) {
log.debug("Unable to make parent file[%s]", storageDir);
}
getPuller(segment.getLoadSpec()).getSegmentFiles(segment, storageDir);
loc.addSegment(segment);
retVal = storageDir;
}
else {
retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
}
loc.addSegment(segment);
return retVal;
}
@Override
public void cleanup(DataSegment segment) throws SegmentLoadingException
{
if (!config.isDeleteOnRemove()) {
return;
}
StorageLocation loc = findStorageLocationIfLoaded(segment);
if (loc == null) {
log.info("Asked to cleanup something[%s] that didn't exist. Skipping.", segment);
return;
}
try {
File cacheFile = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
log.info("Deleting directory[%s]", cacheFile);
FileUtils.deleteDirectory(cacheFile);
loc.removeSegment(segment);
}
catch (IOException e) {
throw new SegmentLoadingException(e, e.getMessage());
}
}
private DataSegmentPuller getPuller(Map<String, Object> loadSpec) throws SegmentLoadingException
{
String type = MapUtils.getString(loadSpec, "type");
DataSegmentPuller loader = pullers.get(type);
if (loader == null) {
throw new SegmentLoadingException("Unknown loader type[%s]. Known types are %s", type, pullers.keySet());
}
return loader;
}
private static class StorageLocation
{
private final File path;
private final long maxSize;
private final Set<DataSegment> segments;
private volatile long currSize = 0;
StorageLocation(
File path,
long maxSize
)
{
this.path = path;
this.maxSize = maxSize;
this.segments = Sets.newHashSet();
}
private File getPath()
{
return path;
}
private Long getMaxSize()
{
return maxSize;
}
private synchronized void addSegment(DataSegment segment)
{
if (! segments.add(segment)) {
currSize += segment.getSize();
}
}
private synchronized void removeSegment(DataSegment segment)
{
if (segments.remove(segment)) {
currSize -= segment.getSize();
}
}
private boolean canHandle(long size)
{
return available() > size;
}
private synchronized long available()
{
return maxSize - currSize;
}
private StorageLocation mostEmpty(StorageLocation other)
{
return available() > other.available() ? this : other;
}
@Override
public String toString()
{
return "StorageLocation{" +
"path=" + path +
", maxSize=" + maxSize +
'}';
}
}
}

View File

@ -30,7 +30,7 @@ public class SegmentLoaderConfig
{
@JsonProperty
@NotNull
private String locations;
private String locations = null;
@JsonProperty("deleteOnRemove")
private boolean deleteOnRemove = true;

View File

@ -40,7 +40,9 @@ import java.util.List;
import java.util.Set;
/**
* TODO: Kill this along with the Guicification of the IndexingService stuff
*/
@Deprecated
public class SingleSegmentLoader implements SegmentLoader
{
private static final Logger log = new Logger(SingleSegmentLoader.class);
@ -92,21 +94,21 @@ public class SingleSegmentLoader implements SegmentLoader
}
@Override
public boolean isSegmentLoaded(final DataSegment segment)
{
return findStorageLocationIfLoaded(segment) != null;
}
public boolean isSegmentLoaded(final DataSegment segment)
{
return findStorageLocationIfLoaded(segment) != null;
}
public StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
{
for (StorageLocation location : locations) {
File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
if (localStorageDir.exists()) {
return location;
}
public StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
{
for (StorageLocation location : locations) {
File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
if (localStorageDir.exists()) {
return location;
}
return null;
}
return null;
}
@Override
public Segment getSegment(DataSegment segment) throws SegmentLoadingException

View File

@ -214,16 +214,6 @@ public class DruidMaster
return loadStatus;
}
public int lookupSegmentLifetime(DataSegment segment)
{
return serverInventoryView.lookupSegmentLifetime(segment);
}
public void decrementRemovedSegmentsLifetime()
{
serverInventoryView.decrementRemovedSegmentsLifetime();
}
public void removeSegment(DataSegment segment)
{
log.info("Removing Segment[%s]", segment);
@ -763,7 +753,7 @@ public class DruidMaster
peon.stop();
}
decrementRemovedSegmentsLifetime();
// TODO: decrementRemovedSegmentsLifetime();
return params.buildFromExisting()
.withDruidCluster(cluster)