remove datasource from hadoop output path (#3196)

fixes #2083, follow-up to #1702
This commit is contained in:
Xavier Léauté 2016-06-29 08:53:45 -07:00 committed by Charles Allen
parent 4c9aeb7353
commit 485e381387
16 changed files with 120 additions and 19 deletions

View File

@ -26,6 +26,8 @@ import java.io.IOException;
public interface DataSegmentPusher public interface DataSegmentPusher
{ {
public String getPathForHadoop(String dataSource); @Deprecated
public DataSegment push(File file, DataSegment segment) throws IOException; String getPathForHadoop(String dataSource);
String getPathForHadoop();
DataSegment push(File file, DataSegment segment) throws IOException;
} }

View File

@ -58,8 +58,15 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
} }
@Deprecated
@Override @Override
public String getPathForHadoop(String dataSource) public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
@Override
public String getPathForHadoop()
{ {
return null; return null;
} }

View File

@ -58,11 +58,18 @@ public class CassandraDataSegmentPusher extends CassandraStorage implements Data
} }
@Override @Override
public String getPathForHadoop(String dataSource) public String getPathForHadoop()
{ {
throw new UnsupportedOperationException("Cassandra storage does not support indexing via Hadoop"); throw new UnsupportedOperationException("Cassandra storage does not support indexing via Hadoop");
} }
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
@Override @Override
public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException
{ {

View File

@ -59,11 +59,18 @@ public class CloudFilesDataSegmentPusher implements DataSegmentPusher
} }
@Override @Override
public String getPathForHadoop(final String dataSource) public String getPathForHadoop()
{ {
return null; return null;
} }
@Deprecated
@Override
public String getPathForHadoop(final String dataSource)
{
return getPathForHadoop();
}
@Override @Override
public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException
{ {

View File

@ -63,8 +63,15 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
log.info("Configured HDFS as deep storage"); log.info("Configured HDFS as deep storage");
} }
@Deprecated
@Override @Override
public String getPathForHadoop(String dataSource) public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
@Override
public String getPathForHadoop()
{ {
return new Path(config.getStorageDirectory()).toUri().toString(); return new Path(config.getStorageDirectory()).toUri().toString();
} }

View File

@ -61,10 +61,17 @@ public class S3DataSegmentPusher implements DataSegmentPusher
log.info("Configured S3 as deep storage"); log.info("Configured S3 as deep storage");
} }
@Override
public String getPathForHadoop()
{
return String.format("s3n://%s/%s", config.getBucket(), config.getBaseKey());
}
@Deprecated
@Override @Override
public String getPathForHadoop(String dataSource) public String getPathForHadoop(String dataSource)
{ {
return String.format("s3n://%s/%s/%s", config.getBucket(), config.getBaseKey(), dataSource); return getPathForHadoop();
} }
@Override @Override

View File

@ -177,7 +177,7 @@ public class HadoopIndexTask extends HadoopTask
new String[]{ new String[]{
toolbox.getObjectMapper().writeValueAsString(spec), toolbox.getObjectMapper().writeValueAsString(spec),
toolbox.getConfig().getHadoopWorkingPath(), toolbox.getConfig().getHadoopWorkingPath(),
toolbox.getSegmentPusher().getPathForHadoop(getDataSource()) toolbox.getSegmentPusher().getPathForHadoop()
}, },
loader loader
); );

View File

@ -339,10 +339,17 @@ public class IndexTask extends AbstractFixedIntervalTask
final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<DataSegment>(); final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<DataSegment>();
final DataSegmentPusher wrappedDataSegmentPusher = new DataSegmentPusher() final DataSegmentPusher wrappedDataSegmentPusher = new DataSegmentPusher()
{ {
@Deprecated
@Override @Override
public String getPathForHadoop(String dataSource) public String getPathForHadoop(String dataSource)
{ {
return toolbox.getSegmentPusher().getPathForHadoop(dataSource); return getPathForHadoop();
}
@Override
public String getPathForHadoop()
{
return toolbox.getSegmentPusher().getPathForHadoop();
} }
@Override @Override

View File

@ -242,8 +242,15 @@ public class IndexTaskTest
} }
}, null, new DataSegmentPusher() }, null, new DataSegmentPusher()
{ {
@Deprecated
@Override @Override
public String getPathForHadoop(String dataSource) public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
@Override
public String getPathForHadoop()
{ {
return null; return null;
} }

View File

@ -213,8 +213,15 @@ public class IngestSegmentFirehoseFactoryTest
newMockEmitter(), newMockEmitter(),
new DataSegmentPusher() new DataSegmentPusher()
{ {
@Deprecated
@Override @Override
public String getPathForHadoop(String dataSource) public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
@Override
public String getPathForHadoop()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@ -456,11 +456,18 @@ public class TaskLifecycleTest
return new DataSegmentPusher() return new DataSegmentPusher()
{ {
@Override @Override
public String getPathForHadoop(String dataSource) public String getPathForHadoop()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
@Override @Override
public DataSegment push(File file, DataSegment segment) throws IOException public DataSegment push(File file, DataSegment segment) throws IOException
{ {
@ -993,8 +1000,15 @@ public class TaskLifecycleTest
{ {
dataSegmentPusher = new DataSegmentPusher() dataSegmentPusher = new DataSegmentPusher()
{ {
@Deprecated
@Override @Override
public String getPathForHadoop(String s) public String getPathForHadoop(String s)
{
return getPathForHadoop();
}
@Override
public String getPathForHadoop()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@ -32,8 +32,15 @@ public class TestDataSegmentPusher implements DataSegmentPusher
{ {
private final Set<DataSegment> pushedSegments = Sets.newConcurrentHashSet(); private final Set<DataSegment> pushedSegments = Sets.newConcurrentHashSet();
@Deprecated
@Override @Override
public String getPathForHadoop(String dataSource) public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
@Override
public String getPathForHadoop()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@ -53,10 +53,17 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
log.info("Configured local filesystem as deep storage"); log.info("Configured local filesystem as deep storage");
} }
@Override
public String getPathForHadoop()
{
return config.getStorageDirectory().getAbsoluteFile().toURI().toString();
}
@Deprecated
@Override @Override
public String getPathForHadoop(String dataSource) public String getPathForHadoop(String dataSource)
{ {
return new File(config.getStorageDirectory().getAbsoluteFile(), dataSource).toURI().toString(); return getPathForHadoop();
} }
@Override @Override

View File

@ -120,8 +120,8 @@ public class LocalDataSegmentPusherTest
config.storageDirectory = new File("/druid"); config.storageDirectory = new File("/druid");
Assert.assertEquals( Assert.assertEquals(
"file:/druid/foo", "file:/druid",
new LocalDataSegmentPusher(config, new ObjectMapper()).getPathForHadoop("foo") new LocalDataSegmentPusher(config, new ObjectMapper()).getPathForHadoop()
); );
} }
@ -131,8 +131,8 @@ public class LocalDataSegmentPusherTest
config.storageDirectory = new File("druid"); config.storageDirectory = new File("druid");
Assert.assertEquals( Assert.assertEquals(
String.format("file:%s/druid/foo", System.getProperty("user.dir")), String.format("file:%s/druid", System.getProperty("user.dir")),
new LocalDataSegmentPusher(config, new ObjectMapper()).getPathForHadoop("foo") new LocalDataSegmentPusher(config, new ObjectMapper()).getPathForHadoop()
); );
} }
} }

View File

@ -165,8 +165,15 @@ public class AppenderatorTester implements AutoCloseable
EmittingLogger.registerEmitter(emitter); EmittingLogger.registerEmitter(emitter);
dataSegmentPusher = new DataSegmentPusher() dataSegmentPusher = new DataSegmentPusher()
{ {
@Deprecated
@Override @Override
public String getPathForHadoop(String dataSource) public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
@Override
public String getPathForHadoop()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@ -121,10 +121,18 @@ public class CliRealtimeExample extends ServerRunnable
private static class NoopDataSegmentPusher implements DataSegmentPusher private static class NoopDataSegmentPusher implements DataSegmentPusher
{ {
@Override
public String getPathForHadoop()
{
return "noop";
}
@Deprecated
@Override @Override
public String getPathForHadoop(String dataSource) public String getPathForHadoop(String dataSource)
{ {
return dataSource; return getPathForHadoop();
} }
@Override @Override