From 485e381387edf1795c5ebbb8002537f14b4ed8cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 29 Jun 2016 08:53:45 -0700 Subject: [PATCH] remove datasource from hadoop output path (#3196) fixes #2083, follow-up to #1702 --- .../segment/loading/DataSegmentPusher.java | 6 ++++-- .../storage/azure/AzureDataSegmentPusher.java | 7 +++++++ .../cassandra/CassandraDataSegmentPusher.java | 19 +++++++++++++------ .../CloudFilesDataSegmentPusher.java | 9 ++++++++- .../storage/hdfs/HdfsDataSegmentPusher.java | 7 +++++++ .../druid/storage/s3/S3DataSegmentPusher.java | 9 ++++++++- .../indexing/common/task/HadoopIndexTask.java | 2 +- .../druid/indexing/common/task/IndexTask.java | 9 ++++++++- .../indexing/common/task/IndexTaskTest.java | 7 +++++++ .../IngestSegmentFirehoseFactoryTest.java | 7 +++++++ .../indexing/overlord/TaskLifecycleTest.java | 16 +++++++++++++++- .../indexing/test/TestDataSegmentPusher.java | 7 +++++++ .../loading/LocalDataSegmentPusher.java | 9 ++++++++- .../loading/LocalDataSegmentPusherTest.java | 8 ++++---- .../appenderator/AppenderatorTester.java | 7 +++++++ .../java/io/druid/cli/CliRealtimeExample.java | 10 +++++++++- 16 files changed, 120 insertions(+), 19 deletions(-) diff --git a/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java b/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java index aeddcb3ea8b..225d46342c1 100644 --- a/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java +++ b/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java @@ -26,6 +26,8 @@ import java.io.IOException; public interface DataSegmentPusher { - public String getPathForHadoop(String dataSource); - public DataSegment push(File file, DataSegment segment) throws IOException; + @Deprecated + String getPathForHadoop(String dataSource); + String getPathForHadoop(); + DataSegment push(File file, DataSegment segment) throws IOException; } diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java index 107779e9ad6..6c6de8b9a72 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java @@ -58,8 +58,15 @@ public class AzureDataSegmentPusher implements DataSegmentPusher this.jsonMapper = jsonMapper; } + @Deprecated @Override public String getPathForHadoop(String dataSource) + { + return getPathForHadoop(); + } + + @Override + public String getPathForHadoop() { return null; } diff --git a/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java b/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java index 9e92b5913cf..f42c8e328c8 100644 --- a/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java +++ b/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java @@ -58,17 +58,24 @@ public class CassandraDataSegmentPusher extends CassandraStorage implements Data } @Override - public String getPathForHadoop(String dataSource) + public String getPathForHadoop() { throw new UnsupportedOperationException("Cassandra storage does not support indexing via Hadoop"); } + @Deprecated @Override - public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException - { - log.info("Writing [%s] to C*", indexFilesDir); - String key = JOINER.join( - config.getKeyspace().isEmpty() ? null : config.getKeyspace(), + public String getPathForHadoop(String dataSource) + { + return getPathForHadoop(); + } + + @Override + public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException + { + log.info("Writing [%s] to C*", indexFilesDir); + String key = JOINER.join( + config.getKeyspace().isEmpty() ? null : config.getKeyspace(), DataSegmentPusherUtil.getStorageDir(segment) ); diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java index fa34d3ac53d..d116aa2513f 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java @@ -59,11 +59,18 @@ public class CloudFilesDataSegmentPusher implements DataSegmentPusher } @Override - public String getPathForHadoop(final String dataSource) + public String getPathForHadoop() { return null; } + @Deprecated + @Override + public String getPathForHadoop(final String dataSource) + { + return getPathForHadoop(); + } + @Override public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException { diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java index 23eb649680a..f6340e7c864 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -63,8 +63,15 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher log.info("Configured HDFS as deep storage"); } + @Deprecated @Override public String getPathForHadoop(String dataSource) + { + return getPathForHadoop(); + } + + @Override + public String getPathForHadoop() { return new Path(config.getStorageDirectory()).toUri().toString(); } diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java index 4d7c7f8cf69..fd33ba1ecef 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java @@ -61,10 +61,17 @@ public class S3DataSegmentPusher implements DataSegmentPusher log.info("Configured S3 as deep storage"); } + @Override + public String getPathForHadoop() + { + return String.format("s3n://%s/%s", config.getBucket(), config.getBaseKey()); + } + + @Deprecated @Override public String getPathForHadoop(String dataSource) { - return String.format("s3n://%s/%s/%s", config.getBucket(), config.getBaseKey(), dataSource); + return getPathForHadoop(); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 28f177f602c..41886d9a227 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -177,7 +177,7 @@ public class HadoopIndexTask extends HadoopTask new String[]{ toolbox.getObjectMapper().writeValueAsString(spec), toolbox.getConfig().getHadoopWorkingPath(), - toolbox.getSegmentPusher().getPathForHadoop(getDataSource()) + toolbox.getSegmentPusher().getPathForHadoop() }, loader ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 87c21e424b3..4918af9e402 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -339,10 +339,17 @@ public class IndexTask extends AbstractFixedIntervalTask final List pushedSegments = new CopyOnWriteArrayList(); final DataSegmentPusher wrappedDataSegmentPusher = new DataSegmentPusher() { + @Deprecated @Override public String getPathForHadoop(String dataSource) { - return toolbox.getSegmentPusher().getPathForHadoop(dataSource); + return getPathForHadoop(); + } + + @Override + public String getPathForHadoop() + { + return toolbox.getSegmentPusher().getPathForHadoop(); } @Override diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 7490ff3fdd1..a67d5f2688a 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -242,8 +242,15 @@ public class IndexTaskTest } }, null, new DataSegmentPusher() { + @Deprecated @Override public String getPathForHadoop(String dataSource) + { + return getPathForHadoop(); + } + + @Override + public String getPathForHadoop() { return null; } diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index e363bd3fc05..b5f96e58877 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -213,8 +213,15 @@ public class IngestSegmentFirehoseFactoryTest newMockEmitter(), new DataSegmentPusher() { + @Deprecated @Override public String getPathForHadoop(String dataSource) + { + return getPathForHadoop(); + } + + @Override + public String getPathForHadoop() { throw new UnsupportedOperationException(); } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 907293b6598..d17d471edeb 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -456,11 +456,18 @@ public class TaskLifecycleTest return new DataSegmentPusher() { @Override - public String getPathForHadoop(String dataSource) + public String getPathForHadoop() { throw new UnsupportedOperationException(); } + @Deprecated + @Override + public String getPathForHadoop(String dataSource) + { + return getPathForHadoop(); + } + @Override public DataSegment push(File file, DataSegment segment) throws IOException { @@ -993,8 +1000,15 @@ public class TaskLifecycleTest { dataSegmentPusher = new DataSegmentPusher() { + @Deprecated @Override public String getPathForHadoop(String s) + { + return getPathForHadoop(); + } + + @Override + public String getPathForHadoop() { throw new UnsupportedOperationException(); } diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java index b9bda3b6884..ad99827052d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java @@ -32,8 +32,15 @@ public class TestDataSegmentPusher implements DataSegmentPusher { private final Set pushedSegments = Sets.newConcurrentHashSet(); + @Deprecated @Override public String getPathForHadoop(String dataSource) + { + return getPathForHadoop(); + } + + @Override + public String getPathForHadoop() { throw new UnsupportedOperationException(); } diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java index d73ea5fd0cc..2e86c2636a9 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java @@ -53,10 +53,17 @@ public class LocalDataSegmentPusher implements DataSegmentPusher log.info("Configured local filesystem as deep storage"); } + @Override + public String getPathForHadoop() + { + return config.getStorageDirectory().getAbsoluteFile().toURI().toString(); + } + + @Deprecated @Override public String getPathForHadoop(String dataSource) { - return new File(config.getStorageDirectory().getAbsoluteFile(), dataSource).toURI().toString(); + return getPathForHadoop(); } @Override diff --git a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java index 7d9e9ab3c8b..9fa62e9baa7 100644 --- a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java +++ b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java @@ -120,8 +120,8 @@ public class LocalDataSegmentPusherTest config.storageDirectory = new File("/druid"); Assert.assertEquals( - "file:/druid/foo", - new LocalDataSegmentPusher(config, new ObjectMapper()).getPathForHadoop("foo") + "file:/druid", + new LocalDataSegmentPusher(config, new ObjectMapper()).getPathForHadoop() ); } @@ -131,8 +131,8 @@ public class LocalDataSegmentPusherTest config.storageDirectory = new File("druid"); Assert.assertEquals( - String.format("file:%s/druid/foo", System.getProperty("user.dir")), - new LocalDataSegmentPusher(config, new ObjectMapper()).getPathForHadoop("foo") + String.format("file:%s/druid", System.getProperty("user.dir")), + new LocalDataSegmentPusher(config, new ObjectMapper()).getPathForHadoop() ); } } diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index 1b73c6f0faf..661d4f041cb 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -165,8 +165,15 @@ public class AppenderatorTester implements AutoCloseable EmittingLogger.registerEmitter(emitter); dataSegmentPusher = new DataSegmentPusher() { + @Deprecated @Override public String getPathForHadoop(String dataSource) + { + return getPathForHadoop(); + } + + @Override + public String getPathForHadoop() { throw new UnsupportedOperationException(); } diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index cca461caf0a..7631d9d7da3 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -121,10 +121,18 @@ public class CliRealtimeExample extends ServerRunnable private static class NoopDataSegmentPusher implements DataSegmentPusher { + + @Override + public String getPathForHadoop() + { + return "noop"; + } + + @Deprecated @Override public String getPathForHadoop(String dataSource) { - return dataSource; + return getPathForHadoop(); } @Override