1) Remove SingleSegmentLoader and replace with OmniSegmentLoader

This commit is contained in:
cheddar 2013-09-12 11:47:03 -05:00
parent b8bd19e87c
commit a2dcc45a8e
13 changed files with 67 additions and 379 deletions

View File

@ -31,14 +31,10 @@ import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.MMappedQueryableIndexFactory;
import io.druid.segment.loading.S3DataSegmentPuller;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.SingleSegmentLoader;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import java.io.File;
import java.util.List;
@ -53,13 +49,13 @@ public class TaskToolbox
private final Task task;
private final TaskActionClientFactory taskActionClientFactory;
private final ServiceEmitter emitter;
private final RestS3Service s3Client;
private final DataSegmentPusher segmentPusher;
private final DataSegmentKiller dataSegmentKiller;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ServerView newSegmentServerView;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final MonitorScheduler monitorScheduler;
private final SegmentLoader segmentLoader;
private final ObjectMapper objectMapper;
public TaskToolbox(
@ -67,13 +63,13 @@ public class TaskToolbox
Task task,
TaskActionClientFactory taskActionClientFactory,
ServiceEmitter emitter,
RestS3Service s3Client,
DataSegmentPusher segmentPusher,
DataSegmentKiller dataSegmentKiller,
DataSegmentAnnouncer segmentAnnouncer,
ServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
MonitorScheduler monitorScheduler,
SegmentLoader segmentLoader,
ObjectMapper objectMapper
)
{
@ -81,13 +77,13 @@ public class TaskToolbox
this.task = task;
this.taskActionClientFactory = taskActionClientFactory;
this.emitter = emitter;
this.s3Client = s3Client;
this.segmentPusher = segmentPusher;
this.dataSegmentKiller = dataSegmentKiller;
this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.monitorScheduler = monitorScheduler;
this.segmentLoader = segmentLoader;
this.objectMapper = objectMapper;
}
@ -144,22 +140,9 @@ public class TaskToolbox
public Map<DataSegment, File> getSegments(List<DataSegment> segments)
throws SegmentLoadingException
{
final SingleSegmentLoader loader = new SingleSegmentLoader(
new S3DataSegmentPuller(s3Client),
new MMappedQueryableIndexFactory(),
new SegmentLoaderConfig()
{
@Override
public String getLocations()
{
return new File(getTaskWorkDir(), "fetched_segments").toString();
}
}
);
Map<DataSegment, File> retVal = Maps.newLinkedHashMap();
for (DataSegment segment : segments) {
retVal.put(segment, loader.getSegmentFiles(segment));
retVal.put(segment, segmentLoader.getSegmentFiles(segment));
}
return retVal;

View File

@ -30,8 +30,8 @@ import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.SegmentLoader;
import io.druid.server.coordination.DataSegmentAnnouncer;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
/**
* Stuff that may be needed by a Task in order to conduct its business.
@ -41,13 +41,13 @@ public class TaskToolboxFactory
private final TaskConfig config;
private final TaskActionClientFactory taskActionClientFactory;
private final ServiceEmitter emitter;
private final RestS3Service s3Client;
private final DataSegmentPusher segmentPusher;
private final DataSegmentKiller dataSegmentKiller;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ServerView newSegmentServerView;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final MonitorScheduler monitorScheduler;
private final SegmentLoader segmentLoader;
private final ObjectMapper objectMapper;
@Inject
@ -55,26 +55,26 @@ public class TaskToolboxFactory
TaskConfig config,
TaskActionClientFactory taskActionClientFactory,
ServiceEmitter emitter,
RestS3Service s3Client,
DataSegmentPusher segmentPusher,
DataSegmentKiller dataSegmentKiller,
DataSegmentAnnouncer segmentAnnouncer,
ServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
MonitorScheduler monitorScheduler,
SegmentLoader segmentLoader,
ObjectMapper objectMapper
)
{
this.config = config;
this.taskActionClientFactory = taskActionClientFactory;
this.emitter = emitter;
this.s3Client = s3Client;
this.segmentPusher = segmentPusher;
this.dataSegmentKiller = dataSegmentKiller;
this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.monitorScheduler = monitorScheduler;
this.segmentLoader = segmentLoader;
this.objectMapper = objectMapper;
}
@ -85,13 +85,13 @@ public class TaskToolboxFactory
task,
taskActionClientFactory,
emitter,
s3Client,
segmentPusher,
dataSegmentKiller,
segmentAnnouncer,
newSegmentServerView,
queryRunnerFactoryConglomerate,
monitorScheduler,
segmentLoader,
objectMapper
);
}

View File

@ -118,7 +118,6 @@ public class TaskLifecycleTest
new TaskConfig(tmp.toString(), null, null, 50000),
tac,
newMockEmitter(),
null, // s3 client
new DataSegmentPusher()
{
@Override
@ -139,6 +138,7 @@ public class TaskLifecycleTest
null, // new segment server view
null, // query runner factory conglomerate corporation unionized collective
null, // monitor scheduler
null, // segment loader
new DefaultObjectMapper()
);

View File

@ -21,8 +21,6 @@ package io.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.HdfsDataSegmentPuller;
import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.loading.OmniSegmentLoader;
@ -49,35 +47,35 @@ public class DataSegmentPullerModule implements Module
private static void bindDeepStorageLocal(Binder binder)
{
final MapBinder<String, DataSegmentPuller> segmentPullerBinder = MapBinder.newMapBinder(
binder, String.class, DataSegmentPuller.class
);
segmentPullerBinder.addBinding("local").to(LocalDataSegmentPuller.class).in(LazySingleton.class);
DruidBinders.dataSegmentPullerBinder(binder)
.addBinding("local")
.to(LocalDataSegmentPuller.class)
.in(LazySingleton.class);
}
private static void bindDeepStorageS3(Binder binder)
{
final MapBinder<String, DataSegmentPuller> segmentPullerBinder = MapBinder.newMapBinder(
binder, String.class, DataSegmentPuller.class
);
segmentPullerBinder.addBinding("s3_zip").to(S3DataSegmentPuller.class).in(LazySingleton.class);
DruidBinders.dataSegmentPullerBinder(binder)
.addBinding("s3_zip")
.to(S3DataSegmentPuller.class)
.in(LazySingleton.class);
}
private static void bindDeepStorageHdfs(Binder binder)
{
final MapBinder<String, DataSegmentPuller> segmentPullerBinder = MapBinder.newMapBinder(
binder, String.class, DataSegmentPuller.class
);
segmentPullerBinder.addBinding("hdfs").to(HdfsDataSegmentPuller.class).in(LazySingleton.class);
DruidBinders.dataSegmentPullerBinder(binder)
.addBinding("hdfs")
.to(HdfsDataSegmentPuller.class)
.in(LazySingleton.class);
binder.bind(Configuration.class).toInstance(new Configuration());
}
private static void bindDeepStorageCassandra(Binder binder)
{
final MapBinder<String, DataSegmentPuller> segmentPullerBinder = MapBinder.newMapBinder(
binder, String.class, DataSegmentPuller.class
);
segmentPullerBinder.addBinding("c*").to(CassandraDataSegmentPuller.class).in(LazySingleton.class);
DruidBinders.dataSegmentPullerBinder(binder)
.addBinding("c*")
.to(CassandraDataSegmentPuller.class)
.in(LazySingleton.class);
ConfigProvider.bind(binder, CassandraDataSegmentConfig.class);
}
}

View File

@ -25,6 +25,7 @@ import com.google.inject.multibindings.MapBinder;
import io.druid.query.Query;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.segment.loading.DataSegmentPuller;
/**
*/
@ -49,4 +50,9 @@ public class DruidBinders
}
);
}
public static MapBinder<String, DataSegmentPuller> dataSegmentPullerBinder(Binder binder)
{
return MapBinder.newMapBinder(binder, String.class, DataSegmentPuller.class);
}
}

View File

@ -1,69 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading;
import com.metamx.common.MapUtils;
import io.druid.segment.Segment;
import io.druid.timeline.DataSegment;
import java.util.Map;
/**
*/
public class DelegatingSegmentLoader implements SegmentLoader
{
private volatile Map<String, SegmentLoader> loaderTypes;
public void setLoaderTypes(
Map<String, SegmentLoader> loaderTypes
)
{
this.loaderTypes = loaderTypes;
}
@Override
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException
{
return getLoader(segment.getLoadSpec()).isSegmentLoaded(segment);
}
@Override
public Segment getSegment(DataSegment segment) throws SegmentLoadingException
{
return getLoader(segment.getLoadSpec()).getSegment(segment);
}
@Override
public void cleanup(DataSegment segment) throws SegmentLoadingException
{
getLoader(segment.getLoadSpec()).cleanup(segment);
}
private SegmentLoader getLoader(Map<String, Object> loadSpec) throws SegmentLoadingException
{
String type = MapUtils.getString(loadSpec, "type");
SegmentLoader loader = loaderTypes.get(type);
if (loader == null) {
throw new SegmentLoadingException("Unknown loader type[%s]. Known types are %s", type, loaderTypes.keySet());
}
return loader;
}
}

View File

@ -116,13 +116,14 @@ public class OmniSegmentLoader implements SegmentLoader
@Override
public Segment getSegment(DataSegment segment) throws SegmentLoadingException
{
File segmentFiles = loadSegmentFiles(segment);
File segmentFiles = getSegmentFiles(segment);
final QueryableIndex index = factory.factorize(segmentFiles);
return new QueryableIndexSegment(segment.getIdentifier(), index);
}
public File loadSegmentFiles(DataSegment segment) throws SegmentLoadingException
@Override
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
StorageLocation loc = findStorageLocationIfLoaded(segment);
@ -199,7 +200,7 @@ public class OmniSegmentLoader implements SegmentLoader
}
private static class StorageLocation
static class StorageLocation
{
private final File path;
private final long maxSize;
@ -218,41 +219,41 @@ public class OmniSegmentLoader implements SegmentLoader
this.segments = Sets.newHashSet();
}
private File getPath()
File getPath()
{
return path;
}
private Long getMaxSize()
Long getMaxSize()
{
return maxSize;
}
private synchronized void addSegment(DataSegment segment)
synchronized void addSegment(DataSegment segment)
{
if (segments.add(segment)) {
currSize += segment.getSize();
}
}
private synchronized void removeSegment(DataSegment segment)
synchronized void removeSegment(DataSegment segment)
{
if (segments.remove(segment)) {
currSize -= segment.getSize();
}
}
private boolean canHandle(long size)
boolean canHandle(long size)
{
return available() > size;
return available() >= size;
}
private synchronized long available()
synchronized long available()
{
return maxSize - currSize;
}
private StorageLocation mostEmpty(StorageLocation other)
StorageLocation mostEmpty(StorageLocation other)
{
return available() > other.available() ? this : other;
}

View File

@ -22,11 +22,14 @@ package io.druid.segment.loading;
import io.druid.segment.Segment;
import io.druid.timeline.DataSegment;
import java.io.File;
/**
*/
public interface SegmentLoader
{
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException;
public Segment getSegment(DataSegment loadSpec) throws SegmentLoadingException;
public Segment getSegment(DataSegment segment) throws SegmentLoadingException;
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;
public void cleanup(DataSegment loadSpec) throws SegmentLoadingException;
}

View File

@ -1,249 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import com.google.inject.Inject;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* TODO: Kill this along with the Guicification of the IndexingService stuff
*/
@Deprecated
public class SingleSegmentLoader implements SegmentLoader
{
private static final Logger log = new Logger(SingleSegmentLoader.class);
private final DataSegmentPuller dataSegmentPuller;
private final QueryableIndexFactory factory;
private final List<StorageLocation> locations;
@Inject
public SingleSegmentLoader(
DataSegmentPuller dataSegmentPuller,
QueryableIndexFactory factory,
SegmentLoaderConfig config
)
{
this.dataSegmentPuller = dataSegmentPuller;
this.factory = factory;
final ImmutableList.Builder<StorageLocation> locBuilder = ImmutableList.builder();
// TODO
// This is a really, really stupid way of getting this information. Splitting on commas and bars is error-prone
// We should instead switch it up to be a JSON Array of JSON Object or something and cool stuff like that
// But, that'll have to wait for some other day.
for (String dirSpec : config.getLocations().split(",")) {
String[] dirSplit = dirSpec.split("\\|");
if (dirSplit.length == 1) {
locBuilder.add(new StorageLocation(new File(dirSplit[0]), Integer.MAX_VALUE));
}
else if (dirSplit.length == 2) {
final Long maxSize = Longs.tryParse(dirSplit[1]);
if (maxSize == null) {
throw new IAE("Size of a local segment storage location must be an integral number, got[%s]", dirSplit[1]);
}
locBuilder.add(new StorageLocation(new File(dirSplit[0]), maxSize));
}
else {
throw new ISE(
"Unknown segment storage location[%s]=>[%s], config[%s].",
dirSplit.length, dirSpec, config.getLocations()
);
}
}
locations = locBuilder.build();
Preconditions.checkArgument(locations.size() > 0, "Must have at least one segment cache directory.");
log.info("Using storage locations[%s]", locations);
}
@Override
public boolean isSegmentLoaded(final DataSegment segment)
{
return findStorageLocationIfLoaded(segment) != null;
}
public StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
{
for (StorageLocation location : locations) {
File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
if (localStorageDir.exists()) {
return location;
}
}
return null;
}
@Override
public Segment getSegment(DataSegment segment) throws SegmentLoadingException
{
File segmentFiles = getSegmentFiles(segment);
final QueryableIndex index = factory.factorize(segmentFiles);
return new QueryableIndexSegment(segment.getIdentifier(), index);
}
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
StorageLocation loc = findStorageLocationIfLoaded(segment);
final File retVal;
if (loc == null) {
Iterator<StorageLocation> locIter = locations.iterator();
loc = locIter.next();
while (locIter.hasNext()) {
loc = loc.mostEmpty(locIter.next());
}
if (!loc.canHandle(segment.getSize())) {
throw new ISE(
"Segment[%s:%,d] too large for storage[%s:%,d].",
segment.getIdentifier(), segment.getSize(), loc.getPath(), loc.available()
);
}
File storageDir = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
if (!storageDir.mkdirs()) {
log.debug("Unable to make parent file[%s]", storageDir);
}
dataSegmentPuller.getSegmentFiles(segment, storageDir);
loc.addSegment(segment);
retVal = storageDir;
}
else {
retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
}
loc.addSegment(segment);
return retVal;
}
@Override
public void cleanup(DataSegment segment) throws SegmentLoadingException
{
StorageLocation loc = findStorageLocationIfLoaded(segment);
if (loc == null) {
log.info("Asked to cleanup something[%s] that didn't exist. Skipping.", segment);
return;
}
try {
File cacheFile = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
log.info("Deleting directory[%s]", cacheFile);
FileUtils.deleteDirectory(cacheFile);
loc.removeSegment(segment);
}
catch (IOException e) {
throw new SegmentLoadingException(e, e.getMessage());
}
}
static class StorageLocation
{
private final File path;
private final long maxSize;
private final Set<DataSegment> segments;
private volatile long currSize = 0;
StorageLocation(
File path,
long maxSize
)
{
this.path = path;
this.maxSize = maxSize;
this.segments = Sets.newHashSet();
}
File getPath()
{
return path;
}
Long getMaxSize()
{
return maxSize;
}
synchronized void addSegment(DataSegment segment)
{
if (segments.add(segment)) {
currSize += segment.getSize();
}
}
synchronized void removeSegment(DataSegment segment)
{
if (segments.remove(segment)) {
currSize -= segment.getSize();
}
}
boolean canHandle(long size)
{
return available() >= size;
}
synchronized long available()
{
return maxSize - currSize;
}
StorageLocation mostEmpty(StorageLocation other)
{
return available() > other.available() ? this : other;
}
@Override
public String toString()
{
return "StorageLocation{" +
"path=" + path +
", maxSize=" + maxSize +
'}';
}
}
}

View File

@ -77,6 +77,12 @@ public class CacheTestSegmentLoader implements SegmentLoader
};
}
@Override
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
throw new UnsupportedOperationException();
}
@Override
public void cleanup(DataSegment loadSpec) throws SegmentLoadingException
{

View File

@ -30,13 +30,13 @@ import java.util.Arrays;
/**
*/
public class SingleSegmentLoaderTest
public class OmniSegmentLoaderTest
{
@Test
public void testStorageLocation() throws Exception
{
long expectedAvail = 1000l;
SingleSegmentLoader.StorageLocation loc = new SingleSegmentLoader.StorageLocation(new File("/tmp"), expectedAvail);
OmniSegmentLoader.StorageLocation loc = new OmniSegmentLoader.StorageLocation(new File("/tmp"), expectedAvail);
verifyLoc(expectedAvail, loc);
@ -65,7 +65,7 @@ public class SingleSegmentLoaderTest
verifyLoc(expectedAvail, loc);
}
private void verifyLoc(long maxSize, SingleSegmentLoader.StorageLocation loc)
private void verifyLoc(long maxSize, OmniSegmentLoader.StorageLocation loc)
{
Assert.assertEquals(maxSize, loc.available());
for (int i = 0; i <= maxSize; ++i) {

View File

@ -65,6 +65,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
@ -114,6 +115,12 @@ public class ServerManagerTest
);
}
@Override
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
throw new UnsupportedOperationException();
}
@Override
public void cleanup(DataSegment segment) throws SegmentLoadingException
{

View File

@ -32,6 +32,7 @@ import io.druid.curator.CuratorModule;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.guice.AWSModule;
import io.druid.guice.AnnouncerModule;
import io.druid.guice.DataSegmentPullerModule;
import io.druid.guice.DataSegmentPusherModule;
import io.druid.guice.DruidProcessingModule;
import io.druid.guice.HttpClientModule;
@ -99,6 +100,7 @@ public class CliPeon implements Runnable
new DiscoveryModule(),
new ServerViewModule(),
new StorageNodeModule(nodeType),
new DataSegmentPullerModule(),
new DataSegmentPusherModule(),
new AnnouncerModule(),
new DruidProcessingModule(),