From a2dcc45a8eadfdef196447f0f42300d59ee7eb6f Mon Sep 17 00:00:00 2001 From: cheddar Date: Thu, 12 Sep 2013 11:47:03 -0500 Subject: [PATCH] 1) Remove SingleSegmentLoader and replace with OmniSegmentLoader --- .../io/druid/indexing/common/TaskToolbox.java | 27 +- .../indexing/common/TaskToolboxFactory.java | 10 +- .../coordinator/TaskLifecycleTest.java | 2 +- .../druid/guice/DataSegmentPullerModule.java | 34 ++- .../java/io/druid/guice/DruidBinders.java | 6 + .../loading/DelegatingSegmentLoader.java | 69 ----- .../segment/loading/OmniSegmentLoader.java | 23 +- .../druid/segment/loading/SegmentLoader.java | 5 +- .../segment/loading/SingleSegmentLoader.java | 249 ------------------ .../loading/CacheTestSegmentLoader.java | 6 + ...erTest.java => OmniSegmentLoaderTest.java} | 6 +- .../coordination/ServerManagerTest.java | 7 + .../src/main/java/io/druid/cli/CliPeon.java | 2 + 13 files changed, 67 insertions(+), 379 deletions(-) delete mode 100644 server/src/main/java/io/druid/segment/loading/DelegatingSegmentLoader.java delete mode 100644 server/src/main/java/io/druid/segment/loading/SingleSegmentLoader.java rename server/src/test/java/io/druid/segment/loading/{SingleSegmentLoaderTest.java => OmniSegmentLoaderTest.java} (91%) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index 242e7dcb90c..dcef1147d17 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -31,14 +31,10 @@ import io.druid.indexing.common.task.Task; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.MMappedQueryableIndexFactory; -import io.druid.segment.loading.S3DataSegmentPuller; -import io.druid.segment.loading.SegmentLoaderConfig; +import io.druid.segment.loading.SegmentLoader; import io.druid.segment.loading.SegmentLoadingException; -import io.druid.segment.loading.SingleSegmentLoader; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; import java.io.File; import java.util.List; @@ -53,13 +49,13 @@ public class TaskToolbox private final Task task; private final TaskActionClientFactory taskActionClientFactory; private final ServiceEmitter emitter; - private final RestS3Service s3Client; private final DataSegmentPusher segmentPusher; private final DataSegmentKiller dataSegmentKiller; private final DataSegmentAnnouncer segmentAnnouncer; private final ServerView newSegmentServerView; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private final MonitorScheduler monitorScheduler; + private final SegmentLoader segmentLoader; private final ObjectMapper objectMapper; public TaskToolbox( @@ -67,13 +63,13 @@ public class TaskToolbox Task task, TaskActionClientFactory taskActionClientFactory, ServiceEmitter emitter, - RestS3Service s3Client, DataSegmentPusher segmentPusher, DataSegmentKiller dataSegmentKiller, DataSegmentAnnouncer segmentAnnouncer, ServerView newSegmentServerView, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, MonitorScheduler monitorScheduler, + SegmentLoader segmentLoader, ObjectMapper objectMapper ) { @@ -81,13 +77,13 @@ public class TaskToolbox this.task = task; this.taskActionClientFactory = taskActionClientFactory; this.emitter = emitter; - this.s3Client = s3Client; this.segmentPusher = segmentPusher; this.dataSegmentKiller = dataSegmentKiller; this.segmentAnnouncer = segmentAnnouncer; this.newSegmentServerView = newSegmentServerView; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; this.monitorScheduler = monitorScheduler; + this.segmentLoader = segmentLoader; this.objectMapper = objectMapper; } @@ -144,22 +140,9 @@ public class TaskToolbox public Map getSegments(List segments) throws SegmentLoadingException { - final SingleSegmentLoader loader = new SingleSegmentLoader( - new S3DataSegmentPuller(s3Client), - new MMappedQueryableIndexFactory(), - new SegmentLoaderConfig() - { - @Override - public String getLocations() - { - return new File(getTaskWorkDir(), "fetched_segments").toString(); - } - } - ); - Map retVal = Maps.newLinkedHashMap(); for (DataSegment segment : segments) { - retVal.put(segment, loader.getSegmentFiles(segment)); + retVal.put(segment, segmentLoader.getSegmentFiles(segment)); } return retVal; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index 5eb3d327e5a..fc1e8db390b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -30,8 +30,8 @@ import io.druid.indexing.common.task.Task; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.loading.SegmentLoader; import io.druid.server.coordination.DataSegmentAnnouncer; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; /** * Stuff that may be needed by a Task in order to conduct its business. @@ -41,13 +41,13 @@ public class TaskToolboxFactory private final TaskConfig config; private final TaskActionClientFactory taskActionClientFactory; private final ServiceEmitter emitter; - private final RestS3Service s3Client; private final DataSegmentPusher segmentPusher; private final DataSegmentKiller dataSegmentKiller; private final DataSegmentAnnouncer segmentAnnouncer; private final ServerView newSegmentServerView; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private final MonitorScheduler monitorScheduler; + private final SegmentLoader segmentLoader; private final ObjectMapper objectMapper; @Inject @@ -55,26 +55,26 @@ public class TaskToolboxFactory TaskConfig config, TaskActionClientFactory taskActionClientFactory, ServiceEmitter emitter, - RestS3Service s3Client, DataSegmentPusher segmentPusher, DataSegmentKiller dataSegmentKiller, DataSegmentAnnouncer segmentAnnouncer, ServerView newSegmentServerView, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, MonitorScheduler monitorScheduler, + SegmentLoader segmentLoader, ObjectMapper objectMapper ) { this.config = config; this.taskActionClientFactory = taskActionClientFactory; this.emitter = emitter; - this.s3Client = s3Client; this.segmentPusher = segmentPusher; this.dataSegmentKiller = dataSegmentKiller; this.segmentAnnouncer = segmentAnnouncer; this.newSegmentServerView = newSegmentServerView; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; this.monitorScheduler = monitorScheduler; + this.segmentLoader = segmentLoader; this.objectMapper = objectMapper; } @@ -85,13 +85,13 @@ public class TaskToolboxFactory task, taskActionClientFactory, emitter, - s3Client, segmentPusher, dataSegmentKiller, segmentAnnouncer, newSegmentServerView, queryRunnerFactoryConglomerate, monitorScheduler, + segmentLoader, objectMapper ); } diff --git a/indexing-service/src/test/java/io/druid/indexing/coordinator/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/coordinator/TaskLifecycleTest.java index 6ea5c1c0d80..ceb30ffdea6 100644 --- a/indexing-service/src/test/java/io/druid/indexing/coordinator/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/coordinator/TaskLifecycleTest.java @@ -118,7 +118,6 @@ public class TaskLifecycleTest new TaskConfig(tmp.toString(), null, null, 50000), tac, newMockEmitter(), - null, // s3 client new DataSegmentPusher() { @Override @@ -139,6 +138,7 @@ public class TaskLifecycleTest null, // new segment server view null, // query runner factory conglomerate corporation unionized collective null, // monitor scheduler + null, // segment loader new DefaultObjectMapper() ); diff --git a/server/src/main/java/io/druid/guice/DataSegmentPullerModule.java b/server/src/main/java/io/druid/guice/DataSegmentPullerModule.java index 6b33cf401a5..3fd70ceecab 100644 --- a/server/src/main/java/io/druid/guice/DataSegmentPullerModule.java +++ b/server/src/main/java/io/druid/guice/DataSegmentPullerModule.java @@ -21,8 +21,6 @@ package io.druid.guice; import com.google.inject.Binder; import com.google.inject.Module; -import com.google.inject.multibindings.MapBinder; -import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.HdfsDataSegmentPuller; import io.druid.segment.loading.LocalDataSegmentPuller; import io.druid.segment.loading.OmniSegmentLoader; @@ -49,35 +47,35 @@ public class DataSegmentPullerModule implements Module private static void bindDeepStorageLocal(Binder binder) { - final MapBinder segmentPullerBinder = MapBinder.newMapBinder( - binder, String.class, DataSegmentPuller.class - ); - segmentPullerBinder.addBinding("local").to(LocalDataSegmentPuller.class).in(LazySingleton.class); + DruidBinders.dataSegmentPullerBinder(binder) + .addBinding("local") + .to(LocalDataSegmentPuller.class) + .in(LazySingleton.class); } private static void bindDeepStorageS3(Binder binder) { - final MapBinder segmentPullerBinder = MapBinder.newMapBinder( - binder, String.class, DataSegmentPuller.class - ); - segmentPullerBinder.addBinding("s3_zip").to(S3DataSegmentPuller.class).in(LazySingleton.class); + DruidBinders.dataSegmentPullerBinder(binder) + .addBinding("s3_zip") + .to(S3DataSegmentPuller.class) + .in(LazySingleton.class); } private static void bindDeepStorageHdfs(Binder binder) { - final MapBinder segmentPullerBinder = MapBinder.newMapBinder( - binder, String.class, DataSegmentPuller.class - ); - segmentPullerBinder.addBinding("hdfs").to(HdfsDataSegmentPuller.class).in(LazySingleton.class); + DruidBinders.dataSegmentPullerBinder(binder) + .addBinding("hdfs") + .to(HdfsDataSegmentPuller.class) + .in(LazySingleton.class); binder.bind(Configuration.class).toInstance(new Configuration()); } private static void bindDeepStorageCassandra(Binder binder) { - final MapBinder segmentPullerBinder = MapBinder.newMapBinder( - binder, String.class, DataSegmentPuller.class - ); - segmentPullerBinder.addBinding("c*").to(CassandraDataSegmentPuller.class).in(LazySingleton.class); + DruidBinders.dataSegmentPullerBinder(binder) + .addBinding("c*") + .to(CassandraDataSegmentPuller.class) + .in(LazySingleton.class); ConfigProvider.bind(binder, CassandraDataSegmentConfig.class); } } diff --git a/server/src/main/java/io/druid/guice/DruidBinders.java b/server/src/main/java/io/druid/guice/DruidBinders.java index 4a3b6729380..5bc84227cd5 100644 --- a/server/src/main/java/io/druid/guice/DruidBinders.java +++ b/server/src/main/java/io/druid/guice/DruidBinders.java @@ -25,6 +25,7 @@ import com.google.inject.multibindings.MapBinder; import io.druid.query.Query; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; +import io.druid.segment.loading.DataSegmentPuller; /** */ @@ -49,4 +50,9 @@ public class DruidBinders } ); } + + public static MapBinder dataSegmentPullerBinder(Binder binder) + { + return MapBinder.newMapBinder(binder, String.class, DataSegmentPuller.class); + } } diff --git a/server/src/main/java/io/druid/segment/loading/DelegatingSegmentLoader.java b/server/src/main/java/io/druid/segment/loading/DelegatingSegmentLoader.java deleted file mode 100644 index 6f907a61813..00000000000 --- a/server/src/main/java/io/druid/segment/loading/DelegatingSegmentLoader.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.segment.loading; - -import com.metamx.common.MapUtils; -import io.druid.segment.Segment; -import io.druid.timeline.DataSegment; - -import java.util.Map; - -/** - */ -public class DelegatingSegmentLoader implements SegmentLoader -{ - private volatile Map loaderTypes; - - public void setLoaderTypes( - Map loaderTypes - ) - { - this.loaderTypes = loaderTypes; - } - - @Override - public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException - { - return getLoader(segment.getLoadSpec()).isSegmentLoaded(segment); - } - - @Override - public Segment getSegment(DataSegment segment) throws SegmentLoadingException - { - return getLoader(segment.getLoadSpec()).getSegment(segment); - } - - @Override - public void cleanup(DataSegment segment) throws SegmentLoadingException - { - getLoader(segment.getLoadSpec()).cleanup(segment); - } - - private SegmentLoader getLoader(Map loadSpec) throws SegmentLoadingException - { - String type = MapUtils.getString(loadSpec, "type"); - SegmentLoader loader = loaderTypes.get(type); - - if (loader == null) { - throw new SegmentLoadingException("Unknown loader type[%s]. Known types are %s", type, loaderTypes.keySet()); - } - return loader; - } -} diff --git a/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java b/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java index 16b2d50d597..de86620a57a 100644 --- a/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java +++ b/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java @@ -116,13 +116,14 @@ public class OmniSegmentLoader implements SegmentLoader @Override public Segment getSegment(DataSegment segment) throws SegmentLoadingException { - File segmentFiles = loadSegmentFiles(segment); + File segmentFiles = getSegmentFiles(segment); final QueryableIndex index = factory.factorize(segmentFiles); return new QueryableIndexSegment(segment.getIdentifier(), index); } - public File loadSegmentFiles(DataSegment segment) throws SegmentLoadingException + @Override + public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException { StorageLocation loc = findStorageLocationIfLoaded(segment); @@ -199,7 +200,7 @@ public class OmniSegmentLoader implements SegmentLoader } - private static class StorageLocation + static class StorageLocation { private final File path; private final long maxSize; @@ -218,41 +219,41 @@ public class OmniSegmentLoader implements SegmentLoader this.segments = Sets.newHashSet(); } - private File getPath() + File getPath() { return path; } - private Long getMaxSize() + Long getMaxSize() { return maxSize; } - private synchronized void addSegment(DataSegment segment) + synchronized void addSegment(DataSegment segment) { if (segments.add(segment)) { currSize += segment.getSize(); } } - private synchronized void removeSegment(DataSegment segment) + synchronized void removeSegment(DataSegment segment) { if (segments.remove(segment)) { currSize -= segment.getSize(); } } - private boolean canHandle(long size) + boolean canHandle(long size) { - return available() > size; + return available() >= size; } - private synchronized long available() + synchronized long available() { return maxSize - currSize; } - private StorageLocation mostEmpty(StorageLocation other) + StorageLocation mostEmpty(StorageLocation other) { return available() > other.available() ? this : other; } diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoader.java b/server/src/main/java/io/druid/segment/loading/SegmentLoader.java index 794b885449f..d4c050b6044 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoader.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoader.java @@ -22,11 +22,14 @@ package io.druid.segment.loading; import io.druid.segment.Segment; import io.druid.timeline.DataSegment; +import java.io.File; + /** */ public interface SegmentLoader { public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException; - public Segment getSegment(DataSegment loadSpec) throws SegmentLoadingException; + public Segment getSegment(DataSegment segment) throws SegmentLoadingException; + public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException; public void cleanup(DataSegment loadSpec) throws SegmentLoadingException; } diff --git a/server/src/main/java/io/druid/segment/loading/SingleSegmentLoader.java b/server/src/main/java/io/druid/segment/loading/SingleSegmentLoader.java deleted file mode 100644 index e24f339b9b7..00000000000 --- a/server/src/main/java/io/druid/segment/loading/SingleSegmentLoader.java +++ /dev/null @@ -1,249 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.segment.loading; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Sets; -import com.google.common.primitives.Longs; -import com.google.inject.Inject; -import com.metamx.common.IAE; -import com.metamx.common.ISE; -import com.metamx.common.logger.Logger; -import io.druid.segment.QueryableIndex; -import io.druid.segment.QueryableIndexSegment; -import io.druid.segment.Segment; -import io.druid.timeline.DataSegment; -import org.apache.commons.io.FileUtils; - -import java.io.File; -import java.io.IOException; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -/** - * TODO: Kill this along with the Guicification of the IndexingService stuff - */ -@Deprecated -public class SingleSegmentLoader implements SegmentLoader -{ - private static final Logger log = new Logger(SingleSegmentLoader.class); - - private final DataSegmentPuller dataSegmentPuller; - private final QueryableIndexFactory factory; - - private final List locations; - - @Inject - public SingleSegmentLoader( - DataSegmentPuller dataSegmentPuller, - QueryableIndexFactory factory, - SegmentLoaderConfig config - ) - { - this.dataSegmentPuller = dataSegmentPuller; - this.factory = factory; - - final ImmutableList.Builder locBuilder = ImmutableList.builder(); - - // TODO - // This is a really, really stupid way of getting this information. Splitting on commas and bars is error-prone - // We should instead switch it up to be a JSON Array of JSON Object or something and cool stuff like that - // But, that'll have to wait for some other day. - for (String dirSpec : config.getLocations().split(",")) { - String[] dirSplit = dirSpec.split("\\|"); - if (dirSplit.length == 1) { - locBuilder.add(new StorageLocation(new File(dirSplit[0]), Integer.MAX_VALUE)); - } - else if (dirSplit.length == 2) { - final Long maxSize = Longs.tryParse(dirSplit[1]); - if (maxSize == null) { - throw new IAE("Size of a local segment storage location must be an integral number, got[%s]", dirSplit[1]); - } - locBuilder.add(new StorageLocation(new File(dirSplit[0]), maxSize)); - } - else { - throw new ISE( - "Unknown segment storage location[%s]=>[%s], config[%s].", - dirSplit.length, dirSpec, config.getLocations() - ); - } - } - locations = locBuilder.build(); - - Preconditions.checkArgument(locations.size() > 0, "Must have at least one segment cache directory."); - log.info("Using storage locations[%s]", locations); - } - - @Override - public boolean isSegmentLoaded(final DataSegment segment) - { - return findStorageLocationIfLoaded(segment) != null; - } - - public StorageLocation findStorageLocationIfLoaded(final DataSegment segment) - { - for (StorageLocation location : locations) { - File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); - if (localStorageDir.exists()) { - return location; - } - } - return null; - } - - @Override - public Segment getSegment(DataSegment segment) throws SegmentLoadingException - { - File segmentFiles = getSegmentFiles(segment); - final QueryableIndex index = factory.factorize(segmentFiles); - - return new QueryableIndexSegment(segment.getIdentifier(), index); - } - - public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException - { - StorageLocation loc = findStorageLocationIfLoaded(segment); - - final File retVal; - - if (loc == null) { - Iterator locIter = locations.iterator(); - loc = locIter.next(); - while (locIter.hasNext()) { - loc = loc.mostEmpty(locIter.next()); - } - - if (!loc.canHandle(segment.getSize())) { - throw new ISE( - "Segment[%s:%,d] too large for storage[%s:%,d].", - segment.getIdentifier(), segment.getSize(), loc.getPath(), loc.available() - ); - } - - File storageDir = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); - if (!storageDir.mkdirs()) { - log.debug("Unable to make parent file[%s]", storageDir); - } - - dataSegmentPuller.getSegmentFiles(segment, storageDir); - loc.addSegment(segment); - - retVal = storageDir; - } - else { - retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); - } - - loc.addSegment(segment); - - return retVal; - } - - @Override - public void cleanup(DataSegment segment) throws SegmentLoadingException - { - StorageLocation loc = findStorageLocationIfLoaded(segment); - - if (loc == null) { - log.info("Asked to cleanup something[%s] that didn't exist. Skipping.", segment); - return; - } - - try { - File cacheFile = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); - log.info("Deleting directory[%s]", cacheFile); - FileUtils.deleteDirectory(cacheFile); - loc.removeSegment(segment); - } - catch (IOException e) { - throw new SegmentLoadingException(e, e.getMessage()); - } - } - - static class StorageLocation - { - private final File path; - private final long maxSize; - private final Set segments; - - private volatile long currSize = 0; - - StorageLocation( - File path, - long maxSize - ) - { - this.path = path; - this.maxSize = maxSize; - - this.segments = Sets.newHashSet(); - } - - File getPath() - { - return path; - } - - Long getMaxSize() - { - return maxSize; - } - - synchronized void addSegment(DataSegment segment) - { - if (segments.add(segment)) { - currSize += segment.getSize(); - } - } - - synchronized void removeSegment(DataSegment segment) - { - if (segments.remove(segment)) { - currSize -= segment.getSize(); - } - } - - boolean canHandle(long size) - { - return available() >= size; - } - - synchronized long available() - { - return maxSize - currSize; - } - - StorageLocation mostEmpty(StorageLocation other) - { - return available() > other.available() ? this : other; - } - - @Override - public String toString() - { - return "StorageLocation{" + - "path=" + path + - ", maxSize=" + maxSize + - '}'; - } - } -} diff --git a/server/src/test/java/io/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/io/druid/segment/loading/CacheTestSegmentLoader.java index 9ccb2675a00..42f16fd21c1 100644 --- a/server/src/test/java/io/druid/segment/loading/CacheTestSegmentLoader.java +++ b/server/src/test/java/io/druid/segment/loading/CacheTestSegmentLoader.java @@ -77,6 +77,12 @@ public class CacheTestSegmentLoader implements SegmentLoader }; } + @Override + public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException + { + throw new UnsupportedOperationException(); + } + @Override public void cleanup(DataSegment loadSpec) throws SegmentLoadingException { diff --git a/server/src/test/java/io/druid/segment/loading/SingleSegmentLoaderTest.java b/server/src/test/java/io/druid/segment/loading/OmniSegmentLoaderTest.java similarity index 91% rename from server/src/test/java/io/druid/segment/loading/SingleSegmentLoaderTest.java rename to server/src/test/java/io/druid/segment/loading/OmniSegmentLoaderTest.java index ebba47cf5f5..90949215298 100644 --- a/server/src/test/java/io/druid/segment/loading/SingleSegmentLoaderTest.java +++ b/server/src/test/java/io/druid/segment/loading/OmniSegmentLoaderTest.java @@ -30,13 +30,13 @@ import java.util.Arrays; /** */ -public class SingleSegmentLoaderTest +public class OmniSegmentLoaderTest { @Test public void testStorageLocation() throws Exception { long expectedAvail = 1000l; - SingleSegmentLoader.StorageLocation loc = new SingleSegmentLoader.StorageLocation(new File("/tmp"), expectedAvail); + OmniSegmentLoader.StorageLocation loc = new OmniSegmentLoader.StorageLocation(new File("/tmp"), expectedAvail); verifyLoc(expectedAvail, loc); @@ -65,7 +65,7 @@ public class SingleSegmentLoaderTest verifyLoc(expectedAvail, loc); } - private void verifyLoc(long maxSize, SingleSegmentLoader.StorageLocation loc) + private void verifyLoc(long maxSize, OmniSegmentLoader.StorageLocation loc) { Assert.assertEquals(maxSize, loc.available()); for (int i = 0; i <= maxSize; ++i) { diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index 08fd201af74..f3483487c1d 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -65,6 +65,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Iterator; @@ -114,6 +115,12 @@ public class ServerManagerTest ); } + @Override + public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException + { + throw new UnsupportedOperationException(); + } + @Override public void cleanup(DataSegment segment) throws SegmentLoadingException { diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index 4147b249775..9962e6d93ce 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -32,6 +32,7 @@ import io.druid.curator.CuratorModule; import io.druid.curator.discovery.DiscoveryModule; import io.druid.guice.AWSModule; import io.druid.guice.AnnouncerModule; +import io.druid.guice.DataSegmentPullerModule; import io.druid.guice.DataSegmentPusherModule; import io.druid.guice.DruidProcessingModule; import io.druid.guice.HttpClientModule; @@ -99,6 +100,7 @@ public class CliPeon implements Runnable new DiscoveryModule(), new ServerViewModule(), new StorageNodeModule(nodeType), + new DataSegmentPullerModule(), new DataSegmentPusherModule(), new AnnouncerModule(), new DruidProcessingModule(),