Merge pull request #262 from metamx/fix-hadoop

Fix HadoopDruidIndexer to work with the new way of things
This commit is contained in:
fjy 2013-10-09 13:27:49 -07:00
commit 3364b9ae5e
153 changed files with 1114 additions and 2484 deletions

View File

@ -33,15 +33,18 @@
<dependencies>
<dependency>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
</dependency>
<dependency>
<groupId>com.netflix.astyanax</groupId>
<artifactId>astyanax</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<!-- Tests -->
<dependency>

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading.cassandra;
package io.druid.storage.cassandra;
import com.fasterxml.jackson.annotation.JsonProperty;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading.cassandra;
package io.druid.storage.cassandra;
import com.google.common.io.Files;
import com.google.inject.Inject;
@ -28,10 +28,10 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.recipes.storage.ChunkedStorage;
import com.netflix.astyanax.recipes.storage.ObjectMetadata;
import io.druid.common.utils.CompressionUtils;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.utils.CompressionUtils;
import org.apache.commons.io.FileUtils;
import java.io.File;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading.cassandra;
package io.druid.storage.cassandra;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
@ -26,11 +26,11 @@ import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.recipes.storage.ChunkedStorage;
import io.druid.common.utils.CompressionUtils;
import io.druid.segment.IndexIO;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import io.druid.utils.CompressionUtils;
import java.io.File;
import java.io.FileInputStream;
@ -57,7 +57,13 @@ public class CassandraDataSegmentPusher extends CassandraStorage implements Data
this.jsonMapper=jsonMapper;
}
@Override
@Override
public String getPathForHadoop(String dataSource)
{
throw new UnsupportedOperationException("Cassandra storage does not support indexing via Hadoop");
}
@Override
public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException
{
log.info("Writing [%s] to C*", indexFilesDir);
@ -71,7 +77,7 @@ public class CassandraDataSegmentPusher extends CassandraStorage implements Data
long indexSize = CompressionUtils.zip(indexFilesDir, compressedIndexFile);
log.info("Wrote compressed file [%s] to [%s]", compressedIndexFile.getAbsolutePath(), key);
int version = IndexIO.getVersionFromDir(indexFilesDir);
int version = SegmentUtils.getVersionFromDir(indexFilesDir);
try
{

View File

@ -17,13 +17,13 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading.cassandra;
package io.druid.storage.cassandra;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import io.druid.guice.DruidBinders;
import io.druid.guice.Binders;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.PolyBind;
@ -45,7 +45,7 @@ public class CassandraDruidModule implements DruidModule
@Override
public void configure(Binder binder)
{
DruidBinders.dataSegmentPullerBinder(binder)
Binders.dataSegmentPullerBinder(binder)
.addBinding("c*")
.to(CassandraDataSegmentPuller.class)
.in(LazySingleton.class);

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading.cassandra;
package io.druid.storage.cassandra;
import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;

View File

@ -1 +1 @@
io.druid.segment.loading.cassandra.CassandraDruidModule
io.druid.storage.cassandra.CassandraDruidModule

View File

@ -1,43 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.common.guava;
/**
*/
public class Runnables
{
public static Runnable threadNaming(final String threadName, final Runnable runnable)
{
return new ThreadRenamingRunnable(threadName)
{
@Override
public void doRun()
{
runnable.run();
}
};
}
public static Runnable getNoopRunnable(){
return new Runnable(){
public void run(){}
};
}
}

View File

@ -1,142 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.common.utils;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.metamx.common.ISE;
import com.metamx.common.StreamUtils;
import com.metamx.common.logger.Logger;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
/**
*/
public class CompressionUtils
{
private static final Logger log = new Logger(CompressionUtils.class);
public static long zip(File directory, File outputZipFile) throws IOException
{
if (!outputZipFile.getName().endsWith(".zip")) {
log.warn("No .zip suffix[%s], putting files from [%s] into it anyway.", outputZipFile, directory);
}
final FileOutputStream out = new FileOutputStream(outputZipFile);
try {
final long retVal = zip(directory, out);
out.close();
return retVal;
}
finally {
Closeables.closeQuietly(out);
}
}
public static long zip(File directory, OutputStream out) throws IOException
{
if (!directory.isDirectory()) {
throw new IOException(String.format("directory[%s] is not a directory", directory));
}
long totalSize = 0;
ZipOutputStream zipOut = null;
try {
zipOut = new ZipOutputStream(out);
File[] files = directory.listFiles();
for (File file : files) {
log.info("Adding file[%s] with size[%,d]. Total size so far[%,d]", file, file.length(), totalSize);
if (file.length() >= Integer.MAX_VALUE) {
zipOut.finish();
throw new IOException(String.format("file[%s] too large [%,d]", file, file.length()));
}
zipOut.putNextEntry(new ZipEntry(file.getName()));
totalSize += ByteStreams.copy(Files.newInputStreamSupplier(file), zipOut);
}
zipOut.closeEntry();
}
finally {
if (zipOut != null) {
zipOut.finish();
}
}
return totalSize;
}
public static void unzip(File pulledFile, File outDir) throws IOException
{
if (!(outDir.exists() && outDir.isDirectory())) {
throw new ISE("outDir[%s] must exist and be a directory", outDir);
}
log.info("Unzipping file[%s] to [%s]", pulledFile, outDir);
InputStream in = null;
try {
in = new BufferedInputStream(new FileInputStream(pulledFile));
unzip(in, outDir);
}
finally {
Closeables.closeQuietly(in);
}
}
public static void unzip(InputStream in, File outDir) throws IOException
{
ZipInputStream zipIn = new ZipInputStream(in);
ZipEntry entry;
while ((entry = zipIn.getNextEntry()) != null) {
FileOutputStream out = null;
try {
out = new FileOutputStream(new File(outDir, entry.getName()));
ByteStreams.copy(zipIn, out);
zipIn.closeEntry();
out.close();
}
finally {
Closeables.closeQuietly(out);
}
}
}
public static void gunzip(File pulledFile, File outDir) throws IOException
{
log.info("Gunzipping file[%s] to [%s]", pulledFile, outDir);
StreamUtils.copyToFileAndClose(new GZIPInputStream(new FileInputStream(pulledFile)), outDir);
if (!pulledFile.delete()) {
log.error("Could not delete tmpFile[%s].", pulledFile);
}
}
}

View File

@ -1,157 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Provider;
import com.google.inject.ProvisionException;
import com.google.inject.TypeLiteral;
import com.google.inject.binder.ScopedBindingBuilder;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.util.Types;
import javax.annotation.Nullable;
import java.lang.reflect.ParameterizedType;
import java.util.Map;
import java.util.Properties;
/**
* Provides the ability to create "polymorphic" bindings. Where the polymorphism is actually just making a decision
* based on a value in a Properties.
*
* The workflow is that you first create a choice by calling createChoice(). Then you create options using the binder
* returned by the optionBinder() method. Multiple different modules can call optionBinder and all options will be
* reflected at injection time as long as equivalent interface Key objects are passed into the various methods.
*/
public class PolyBind
{
/**
* Sets up a "choice" for the injector to resolve at injection time.
*
* @param binder the binder for the injector that is being configured
* @param property the property that will be checked to determine the implementation choice
* @param interfaceKey the interface that will be injected using this choice
* @param defaultKey the default instance to be injected if the property doesn't match a choice. Can be null
* @param <T> interface type
* @return A ScopedBindingBuilder so that scopes can be added to the binding, if required.
*/
public static <T> ScopedBindingBuilder createChoice(
Binder binder,
String property,
Key<T> interfaceKey,
@Nullable Key<? extends T> defaultKey
)
{
return binder.bind(interfaceKey).toProvider(new ConfiggedProvider<T>(interfaceKey, property, defaultKey));
}
/**
* Binds an option for a specific choice. The choice must already be registered on the injector for this to work.
*
* @param binder the binder for the injector that is being configured
* @param interfaceKey the interface that will have an option added to it. This must equal the
* Key provided to createChoice
* @param <T> interface type
* @return A MapBinder that can be used to create the actual option bindings.
*/
public static <T> MapBinder<String, T> optionBinder(Binder binder, Key<T> interfaceKey)
{
final TypeLiteral<T> interfaceType = interfaceKey.getTypeLiteral();
if (interfaceKey.getAnnotation() != null) {
return MapBinder.newMapBinder(
binder, TypeLiteral.get(String.class), interfaceType, interfaceKey.getAnnotation()
);
}
else if (interfaceKey.getAnnotationType() != null) {
return MapBinder.newMapBinder(
binder, TypeLiteral.get(String.class), interfaceType, interfaceKey.getAnnotationType()
);
}
else {
return MapBinder.newMapBinder(binder, TypeLiteral.get(String.class), interfaceType);
}
}
static class ConfiggedProvider<T> implements Provider<T>
{
private final Key<T> key;
private final String property;
private final Key<? extends T> defaultKey;
private Injector injector;
private Properties props;
ConfiggedProvider(
Key<T> key,
String property,
Key<? extends T> defaultKey
)
{
this.key = key;
this.property = property;
this.defaultKey = defaultKey;
}
@Inject
void configure(Injector injector, Properties props)
{
this.injector = injector;
this.props = props;
}
@Override
@SuppressWarnings("unchecked")
public T get()
{
final ParameterizedType mapType = Types.mapOf(
String.class, Types.newParameterizedType(Provider.class, key.getTypeLiteral().getType())
);
final Map<String, Provider<T>> implsMap;
if (key.getAnnotation() != null) {
implsMap = (Map<String, Provider<T>>) injector.getInstance(Key.get(mapType, key.getAnnotation()));
}
else if (key.getAnnotationType() != null) {
implsMap = (Map<String, Provider<T>>) injector.getInstance(Key.get(mapType, key.getAnnotation()));
}
else {
implsMap = (Map<String, Provider<T>>) injector.getInstance(Key.get(mapType));
}
final String implName = props.getProperty(property);
final Provider<T> provider = implsMap.get(implName);
if (provider == null) {
if (defaultKey == null) {
throw new ProvisionException(
String.format("Unknown provider[%s] of %s, known options[%s]", implName, key, implsMap.keySet())
);
}
return injector.getInstance(defaultKey);
}
return provider.get();
}
}
}

View File

@ -1,128 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
import com.google.common.collect.Iterables;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.name.Names;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Properties;
/**
*/
public class PolyBindTest
{
private Properties props;
private Injector injector;
public void setUp(Module... modules) throws Exception
{
props = new Properties();
injector = Guice.createInjector(
Iterables.concat(
Arrays.asList(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(Properties.class).toInstance(props);
PolyBind.createChoice(binder, "billy", Key.get(Gogo.class), Key.get(GoA.class));
}
}
),
Arrays.asList(modules)
)
);
}
@Test
public void testSanity() throws Exception
{
setUp(
new Module()
{
@Override
public void configure(Binder binder)
{
final MapBinder<String,Gogo> gogoBinder = PolyBind.optionBinder(binder, Key.get(Gogo.class));
gogoBinder.addBinding("a").to(GoA.class);
gogoBinder.addBinding("b").to(GoB.class);
PolyBind.createChoice(
binder, "billy", Key.get(Gogo.class, Names.named("reverse")), Key.get(GoB.class)
);
final MapBinder<String,Gogo> annotatedGogoBinder = PolyBind.optionBinder(
binder, Key.get(Gogo.class, Names.named("reverse"))
);
annotatedGogoBinder.addBinding("a").to(GoB.class);
annotatedGogoBinder.addBinding("b").to(GoA.class);
}
}
);
Assert.assertEquals("A", injector.getInstance(Gogo.class).go());
Assert.assertEquals("B", injector.getInstance(Key.get(Gogo.class, Names.named("reverse"))).go());
props.setProperty("billy", "b");
Assert.assertEquals("B", injector.getInstance(Gogo.class).go());
Assert.assertEquals("A", injector.getInstance(Key.get(Gogo.class, Names.named("reverse"))).go());
props.setProperty("billy", "a");
Assert.assertEquals("A", injector.getInstance(Gogo.class).go());
Assert.assertEquals("B", injector.getInstance(Key.get(Gogo.class, Names.named("reverse"))).go());
props.setProperty("billy", "b");
Assert.assertEquals("B", injector.getInstance(Gogo.class).go());
Assert.assertEquals("A", injector.getInstance(Key.get(Gogo.class, Names.named("reverse"))).go());
props.setProperty("billy", "c");
Assert.assertEquals("A", injector.getInstance(Gogo.class).go());
Assert.assertEquals("B", injector.getInstance(Key.get(Gogo.class, Names.named("reverse"))).go());
}
public static interface Gogo
{
public String go();
}
public static class GoA implements Gogo
{
@Override
public String go()
{
return "A";
}
}
public static class GoB implements Gogo
{
@Override
public String go()
{
return "B";
}
}
}

View File

@ -63,6 +63,13 @@
<artifactId>twitter4j-stream</artifactId>
<version>3.0.3</version>
</dependency>
<!-- For tests! -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,58 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package druid.examples;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import druid.examples.web.WebFirehoseFactory;
import io.druid.initialization.DruidModule;
import java.util.Arrays;
import java.util.List;
/**
*/
public class ExamplesDruidModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return Arrays.<Module>asList(
new SimpleModule("ExamplesModule")
.registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream")
)
);
}
@Override
public void configure(Binder binder)
{
}
}

View File

@ -28,7 +28,7 @@ import com.google.common.io.Closeables;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.StringInputRowParser;
import io.druid.data.input.impl.StringInputRowParser;
import java.io.BufferedReader;
import java.io.File;

View File

@ -25,11 +25,11 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Throwables;
import com.metamx.common.parsers.TimestampParser;
import com.metamx.emitter.EmittingLogger;
import io.druid.common.guava.Runnables;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.utils.Runnables;
import org.joda.time.DateTime;
import java.io.IOException;

61
hdfs-storage/pom.xml Normal file
View File

@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright (C) 2012, 2013 Metamarkets Group Inc.
~
~ This program is free software; you can redistribute it and/or
~ modify it under the terms of the GNU General Public License
~ as published by the Free Software Foundation; either version 2
~ of the License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU General Public License for more details.
~
~ You should have received a copy of the GNU General Public License
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-hdfs-storage</artifactId>
<name>druid-hdfs-storage</name>
<description>druid-hdfs-storage</description>
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.6.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.0.3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -17,12 +17,14 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading;
package io.druid.storage.hdfs;
import com.google.common.io.Closeables;
import com.google.inject.Inject;
import io.druid.common.utils.CompressionUtils;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.utils.CompressionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;

View File

@ -17,18 +17,21 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading;
package io.druid.storage.hdfs;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.OutputSupplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.CompressionUtils;
import io.druid.segment.IndexIO;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import io.druid.utils.CompressionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@ -60,6 +63,17 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
this.jsonMapper = jsonMapper;
}
@Override
public String getPathForHadoop(String dataSource)
{
try {
return new Path(config.getStorageDirectory(), dataSource).makeQualified(FileSystem.get(hadoopConfig)).toString();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@Override
public DataSegment push(File inDir, DataSegment segment) throws IOException
{
@ -85,7 +99,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
return createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(outFile))
.withSize(size)
.withBinaryVersion(IndexIO.CURRENT_VERSION_ID),
.withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)),
outFile.getParent(),
fs
);

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading;
package io.druid.storage.hdfs;
import com.fasterxml.jackson.annotation.JsonProperty;

View File

@ -0,0 +1,71 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.hdfs;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Inject;
import io.druid.guice.Binders;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.initialization.DruidModule;
import org.apache.hadoop.conf.Configuration;
import java.util.List;
import java.util.Properties;
/**
*/
public class HdfsStorageDruidModule implements DruidModule
{
private Properties props = null;
@Inject
public void setProperties(Properties props)
{
this.props = props;
}
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of();
}
@Override
public void configure(Binder binder)
{
Binders.dataSegmentPullerBinder(binder).addBinding("hdfs").to(HdfsDataSegmentPuller.class).in(LazySingleton.class);
Binders.dataSegmentPusherBinder(binder).addBinding("hdfs").to(HdfsDataSegmentPusher.class).in(LazySingleton.class);
final Configuration conf = new Configuration();
if (props != null) {
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
}
}
binder.bind(Configuration.class).toInstance(conf);
JsonConfigProvider.bind(binder, "druid.storage", HdfsDataSegmentPusherConfig.class);
}
}

View File

@ -0,0 +1 @@
io.druid.storage.hdfs.HdfsStorageDruidModule

View File

@ -75,11 +75,6 @@
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
<version>0.7.1</version>
</dependency>
<!-- Tests -->

View File

@ -141,7 +141,7 @@ public class DeterminePartitionsJob implements Jobby
groupByJob.setOutputKeyClass(BytesWritable.class);
groupByJob.setOutputValueClass(NullWritable.class);
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
groupByJob.setJarByClass(DeterminePartitionsJob.class);
JobHelper.setupClasspath(config, groupByJob);
config.addInputPaths(groupByJob);
config.intoConfiguration(groupByJob);
@ -189,9 +189,9 @@ public class DeterminePartitionsJob implements Jobby
dimSelectionJob.setOutputKeyClass(BytesWritable.class);
dimSelectionJob.setOutputValueClass(Text.class);
dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class);
dimSelectionJob.setJarByClass(DeterminePartitionsJob.class);
dimSelectionJob.setPartitionerClass(DeterminePartitionsDimSelectionPartitioner.class);
dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().size());
JobHelper.setupClasspath(config, dimSelectionJob);
config.intoConfiguration(dimSelectionJob);
FileOutputFormat.setOutputPath(dimSelectionJob, config.makeIntermediatePath());
@ -486,7 +486,7 @@ public class DeterminePartitionsJob implements Jobby
private Iterable<DimValueCount> combineRows(Iterable<Text> input)
{
return new CombiningIterable<DimValueCount>(
return new CombiningIterable<>(
Iterables.transform(
input,
new Function<Text, DimValueCount>()
@ -758,14 +758,19 @@ public class DeterminePartitionsJob implements Jobby
log.info("Chosen partitions:");
for (ShardSpec shardSpec : chosenShardSpecs) {
log.info(" %s", shardSpec);
log.info(" %s", HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(shardSpec));
}
System.out.println(HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(chosenShardSpecs));
try {
HadoopDruidIndexerConfig.jsonMapper.writerWithType(new TypeReference<List<ShardSpec>>() {}).writeValue(
out,
chosenShardSpecs
);
HadoopDruidIndexerConfig.jsonMapper
.writerWithType(
new TypeReference<List<ShardSpec>>()
{
}
)
.writeValue(out, chosenShardSpecs);
}
finally {
Closeables.close(out, false);

View File

@ -21,7 +21,6 @@ package io.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
@ -33,24 +32,31 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.DataSpec;
import io.druid.data.input.InputRow;
import io.druid.data.input.StringInputRowParser;
import io.druid.data.input.TimestampSpec;
import io.druid.data.input.ToLowercaseDataSpec;
import io.druid.data.input.impl.DataSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.data.input.impl.ToLowercaseDataSpec;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self;
import io.druid.indexer.granularity.GranularitySpec;
import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.initialization.Initialization;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.server.DruidNode;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.ShardSpec;
import org.apache.hadoop.conf.Configuration;
@ -78,11 +84,28 @@ public class HadoopDruidIndexerConfig
public static final Splitter tabSplitter = Splitter.on("\t");
public static final Joiner tabJoiner = Joiner.on("\t");
private static final Injector injector;
public static final ObjectMapper jsonMapper;
static {
jsonMapper = new DefaultObjectMapper();
jsonMapper.configure(JsonGenerator.Feature.ESCAPE_NON_ASCII, true);
injector = Initialization.makeInjectorWithModules(
Initialization.makeStartupInjector(),
ImmutableList.<Object>of(
new Module()
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bindInstance(
binder, Key.get(DruidNode.class, Self.class), new DruidNode("hadoop-indexer", "localhost", -1)
);
}
}
)
);
jsonMapper = injector.getInstance(ObjectMapper.class);
}
public static enum IndexJobCounters

View File

@ -21,7 +21,7 @@ package io.druid.indexer;
import com.metamx.common.RE;
import io.druid.data.input.InputRow;
import io.druid.data.input.StringInputRowParser;
import io.druid.data.input.impl.StringInputRowParser;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

View File

@ -33,12 +33,13 @@ import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow;
import io.druid.data.input.StringInputRowParser;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.timeline.DataSegment;
@ -136,7 +137,7 @@ public class IndexGeneratorJob implements Jobby
config.addInputPaths(job);
config.intoConfiguration(job);
job.setJarByClass(IndexGeneratorJob.class);
JobHelper.setupClasspath(config, job);
job.submit();
log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL());
@ -446,7 +447,7 @@ public class IndexGeneratorJob implements Jobby
dimensionNames,
metricNames,
config.getShardSpec(bucket).getActualSpec(),
IndexIO.getVersionFromDir(mergedBase),
SegmentUtils.getVersionFromDir(mergedBase),
size
);

View File

@ -0,0 +1,97 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer;
import com.google.api.client.util.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.io.OutputSupplier;
import com.metamx.common.logger.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Set;
/**
*/
public class JobHelper
{
private static final Logger log = new Logger(JobHelper.class);
private static final Set<Path> existing = Sets.newHashSet();
public static void setupClasspath(
HadoopDruidIndexerConfig config,
Job groupByJob
)
throws IOException
{
String classpathProperty = System.getProperty("druid.hadoop.internal.classpath");
if (classpathProperty == null) {
classpathProperty = System.getProperty("java.class.path");
}
String[] jarFiles = classpathProperty.split(File.pathSeparator);
final Configuration conf = groupByJob.getConfiguration();
final FileSystem fs = FileSystem.get(conf);
Path distributedClassPath = new Path(config.getJobOutputDir(), "classpath");
if (fs instanceof LocalFileSystem) {
return;
}
for (String jarFilePath : jarFiles) {
File jarFile = new File(jarFilePath);
if (jarFile.getName().endsWith(".jar")) {
final Path hdfsPath = new Path(distributedClassPath, jarFile.getName());
if (! existing.contains(hdfsPath)) {
if (jarFile.getName().endsWith("SNAPSHOT.jar") || !fs.exists(hdfsPath)) {
log.info("Uploading jar to path[%s]", hdfsPath);
ByteStreams.copy(
Files.newInputStreamSupplier(jarFile),
new OutputSupplier<OutputStream>()
{
@Override
public OutputStream getOutput() throws IOException
{
return fs.create(hdfsPath);
}
}
);
}
existing.add(hdfsPath);
}
DistributedCache.addFileToClassPath(hdfsPath, conf, fs);
}
}
}
}

View File

@ -130,10 +130,6 @@
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
@ -163,5 +159,10 @@
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -32,7 +32,7 @@ import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapInputRowParser;
import io.druid.data.input.impl.MapInputRowParser;
import javax.ws.rs.POST;
import javax.ws.rs.Path;

View File

@ -35,6 +35,7 @@ import io.druid.query.QueryRunner;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
@ -150,7 +151,7 @@ public class YeOldePlumberSchool implements PlumberSchool
final DataSegment segmentToUpload = theSink.getSegment()
.withDimensions(ImmutableList.copyOf(mappedSegment.getAvailableDimensions()))
.withBinaryVersion(IndexIO.getVersionFromDir(fileToUpload));
.withBinaryVersion(SegmentUtils.getVersionFromDir(fileToUpload));
dataSegmentPusher.push(fileToUpload, segmentToUpload);

View File

@ -32,9 +32,7 @@ import io.druid.indexer.HadoopDruidIndexerJob;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.segment.loading.S3DataSegmentPusher;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
@ -102,21 +100,7 @@ public class HadoopIndexTask extends AbstractTask
// Set workingPath to some reasonable default
configCopy.setJobOutputDir(toolbox.getConfig().getHadoopWorkingPath());
if (toolbox.getSegmentPusher() instanceof S3DataSegmentPusher) {
// Hack alert! Bypassing DataSegmentPusher...
S3DataSegmentPusher segmentPusher = (S3DataSegmentPusher) toolbox.getSegmentPusher();
String s3Path = String.format(
"s3n://%s/%s/%s",
segmentPusher.getConfig().getBucket(),
segmentPusher.getConfig().getBaseKey(),
getDataSource()
);
log.info("Setting segment output path to: %s", s3Path);
configCopy.setSegmentOutputDir(s3Path);
} else {
throw new IllegalStateException("Sorry, we only work with S3DataSegmentPushers! Bummer!");
}
configCopy.setSegmentOutputDir(toolbox.getSegmentPusher().getPathForHadoop(getDataSource()));
HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(configCopy);
configCopy.verify();

View File

@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
@ -33,7 +32,6 @@ import io.druid.data.input.InputRow;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.index.YeOldePlumberSchool;
import io.druid.segment.loading.DataSegmentPusher;
@ -48,7 +46,6 @@ import org.joda.time.Interval;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
public class IndexGeneratorTask extends AbstractTask
@ -126,6 +123,12 @@ public class IndexGeneratorTask extends AbstractTask
final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<DataSegment>();
final DataSegmentPusher wrappedDataSegmentPusher = new DataSegmentPusher()
{
@Override
public String getPathForHadoop(String dataSource)
{
return toolbox.getSegmentPusher().getPathForHadoop(dataSource);
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{

View File

@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.indexer.granularity.GranularitySpec;
import io.druid.indexing.common.TaskStatus;
@ -33,7 +34,6 @@ import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SpawnTasksAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.incremental.SpatialDimensionSchema;
import io.druid.segment.realtime.Schema;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;

View File

@ -1,45 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.tasklogs;
import com.google.common.base.Optional;
import com.google.common.io.InputSupplier;
import com.metamx.common.logger.Logger;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
public class NoopTaskLogs implements TaskLogs
{
private final Logger log = new Logger(TaskLogs.class);
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(String taskid, long offset) throws IOException
{
return Optional.absent();
}
@Override
public void pushTaskLog(String taskid, File logFile) throws IOException
{
log.info("Not pushing logs for task: %s", taskid);
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
import io.druid.tasklogs.TaskLogStreamer;
import java.io.IOException;
import java.io.InputStream;

View File

@ -1,31 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.tasklogs;
import java.io.File;
import java.io.IOException;
/**
* Something that knows how to persist local task logs to some form of long-term storage.
*/
public interface TaskLogPusher
{
public void pushTaskLog(String taskid, File logFile) throws IOException;
}

View File

@ -1,42 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.tasklogs;
import com.google.common.base.Optional;
import com.google.common.io.InputSupplier;
import java.io.IOException;
import java.io.InputStream;
/**
* Something that knows how to stream logs for tasks.
*/
public interface TaskLogStreamer
{
/**
* Stream log for a task.
*
* @param offset If zero, stream the entire log. If positive, attempt to read from this position onwards. If
* negative, attempt to read this many bytes from the end of the file (like <tt>tail -n</tt>).
*
* @return input supplier for this log, if available from this provider
*/
public Optional<InputSupplier<InputStream>> streamTaskLog(String taskid, long offset) throws IOException;
}

View File

@ -1,24 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.tasklogs;
public interface TaskLogs extends TaskLogStreamer, TaskLogPusher
{
}

View File

@ -24,6 +24,7 @@ import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskRunner;
import io.druid.tasklogs.TaskLogStreamer;
import java.io.IOException;
import java.io.InputStream;

View File

@ -43,10 +43,10 @@ import com.metamx.emitter.EmittingLogger;
import io.druid.guice.annotations.Self;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.tasklogs.TaskLogPusher;
import io.druid.indexing.common.tasklogs.TaskLogStreamer;
import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import io.druid.server.DruidNode;
import io.druid.tasklogs.TaskLogPusher;
import io.druid.tasklogs.TaskLogStreamer;
import org.apache.commons.io.FileUtils;
import java.io.File;

View File

@ -22,9 +22,9 @@ package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import io.druid.guice.annotations.Self;
import io.druid.indexing.common.tasklogs.TaskLogPusher;
import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import io.druid.server.DruidNode;
import io.druid.tasklogs.TaskLogPusher;
import java.util.Properties;

View File

@ -45,12 +45,12 @@ import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.tasklogs.TaskLogStreamer;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.tasklogs.TaskLogStreamer;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;

View File

@ -21,9 +21,9 @@ package io.druid.indexing.overlord.http;
import com.google.inject.Inject;
import io.druid.common.config.JacksonConfigManager;
import io.druid.indexing.common.tasklogs.TaskLogStreamer;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.tasklogs.TaskLogStreamer;
import javax.ws.rs.Path;

View File

@ -31,7 +31,6 @@ import io.druid.common.config.JacksonConfigManager;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.tasklogs.TaskLogStreamer;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue;
import io.druid.indexing.overlord.TaskRunner;
@ -39,6 +38,7 @@ import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.indexing.overlord.scaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.tasklogs.TaskLogStreamer;
import io.druid.timeline.DataSegment;
import javax.ws.rs.Consumes;

View File

@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.metamx.common.Granularity;
import io.druid.data.input.JSONDataSpec;
import io.druid.data.input.impl.JSONDataSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.granularity.UniformGranularitySpec;

View File

@ -127,6 +127,12 @@ public class TaskLifecycleTest
newMockEmitter(),
new DataSegmentPusher()
{
@Override
public String getPathForHadoop(String dataSource)
{
throw new UnsupportedOperationException();
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{

49
pom.xml
View File

@ -47,11 +47,12 @@
<module>examples</module>
<module>indexing-hadoop</module>
<module>indexing-service</module>
<module>realtime</module>
<module>server</module>
<module>services</module>
<module>processing</module>
<module>cassandra-storage</module>
<module>hdfs-storage</module>
<module>s3-extensions</module>
</modules>
<dependencyManagement>
@ -59,7 +60,7 @@
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>0.1.0-SNAPSHOT</version>
</dependency>
<!-- Compile Scope -->
@ -122,7 +123,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.3.27</version>
<version>1.6.0.1</version>
<exclusions>
<exclusion>
<groupId>javax.mail</groupId>
@ -146,7 +147,7 @@
<dependency>
<groupId>io.airlift</groupId>
<artifactId>airline</artifactId>
<version>0.5</version>
<version>0.6</version>
</dependency>
<dependency>
<groupId>org.skife.config</groupId>
@ -173,17 +174,6 @@
<artifactId>curator-x-discovery</artifactId>
<version>${apache.curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.2</version>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api-2.5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>it.uniroma3.mat</groupId>
<artifactId>extendedset</artifactId>
@ -299,11 +289,6 @@
<artifactId>jersey-server</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
<version>0.8.1</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
@ -394,8 +379,28 @@
<artifactId>aether-api</artifactId>
<version>0.9.0.M2</version>
</dependency>
<dependency>
<groupId>kafka</groupId>
<artifactId>core-kafka</artifactId>
<version>0.7.2-mmx1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.0.3</version>
<scope>provided</scope>
</dependency>
<!-- Test Scope -->
<dependency>

View File

@ -1,33 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.data.input;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import java.nio.ByteBuffer;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = StringInputRowParser.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "protobuf", value = ProtoBufInputRowParser.class),
@JsonSubTypes.Type(name = "string", value = StringInputRowParser.class)
})
public interface ByteBufferInputRowParser extends InputRowParser<ByteBuffer> {
}

View File

@ -1,98 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.data.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.metamx.common.parsers.CSVParser;
import com.metamx.common.parsers.Parser;
import io.druid.segment.incremental.SpatialDimensionSchema;
import java.util.List;
/**
*/
public class CSVDataSpec implements DataSpec
{
private final List<String> columns;
private final List<String> dimensions;
private final List<SpatialDimensionSchema> spatialDimensions;
@JsonCreator
public CSVDataSpec(
@JsonProperty("columns") List<String> columns,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions
)
{
Preconditions.checkNotNull(columns, "columns");
for (String column : columns) {
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
}
this.columns = columns;
this.dimensions = dimensions;
this.spatialDimensions = (spatialDimensions == null)
? Lists.<SpatialDimensionSchema>newArrayList()
: spatialDimensions;
}
@JsonProperty("columns")
public List<String> getColumns()
{
return columns;
}
@JsonProperty("dimensions")
@Override
public List<String> getDimensions()
{
return dimensions;
}
@JsonProperty("spatialDimensions")
@Override
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
}
@Override
public void verify(List<String> usedCols)
{
for (String columnName : usedCols) {
Preconditions.checkArgument(columns.contains(columnName), "column[%s] not in columns.", columnName);
}
}
@Override
public boolean hasCustomDimensions()
{
return !(dimensions == null || dimensions.isEmpty());
}
@Override
public Parser<String, Object> getParser()
{
return new CSVParser(columns);
}
}

View File

@ -1,48 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.data.input;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.metamx.common.parsers.Parser;
import io.druid.segment.incremental.SpatialDimensionSchema;
import java.util.List;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "format", defaultImpl = DelimitedDataSpec.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "json", value = JSONDataSpec.class),
@JsonSubTypes.Type(name = "csv", value = CSVDataSpec.class),
@JsonSubTypes.Type(name = "tsv", value = DelimitedDataSpec.class)
})
public interface DataSpec
{
public void verify(List<String> usedCols);
public boolean hasCustomDimensions();
public List<String> getDimensions();
public List<SpatialDimensionSchema> getSpatialDimensions();
public Parser<String, Object> getParser();
}

View File

@ -1,109 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.data.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.metamx.common.parsers.DelimitedParser;
import com.metamx.common.parsers.Parser;
import io.druid.segment.incremental.SpatialDimensionSchema;
import java.util.List;
/**
*/
public class DelimitedDataSpec implements DataSpec
{
private final String delimiter;
private final List<String> columns;
private final List<String> dimensions;
private final List<SpatialDimensionSchema> spatialDimensions;
@JsonCreator
public DelimitedDataSpec(
@JsonProperty("delimiter") String delimiter,
@JsonProperty("columns") List<String> columns,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions
)
{
Preconditions.checkNotNull(columns);
for (String column : columns) {
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
}
this.delimiter = (delimiter == null) ? DelimitedParser.DEFAULT_DELIMITER : delimiter;
this.columns = columns;
this.dimensions = dimensions;
this.spatialDimensions = (spatialDimensions == null)
? Lists.<SpatialDimensionSchema>newArrayList()
: spatialDimensions;
}
@JsonProperty("delimiter")
public String getDelimiter()
{
return delimiter;
}
@JsonProperty("columns")
public List<String> getColumns()
{
return columns;
}
@JsonProperty("dimensions")
@Override
public List<String> getDimensions()
{
return dimensions;
}
@JsonProperty("spatialDimensions")
@Override
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
}
@Override
public void verify(List<String> usedCols)
{
for (String columnName : usedCols) {
Preconditions.checkArgument(columns.contains(columnName), "column[%s] not in columns.", columnName);
}
}
@Override
public boolean hasCustomDimensions()
{
return !(dimensions == null || dimensions.isEmpty());
}
@Override
public Parser<String, Object> getParser()
{
Parser<String, Object> retVal = new DelimitedParser(delimiter);
retVal.setFieldNames(columns);
return retVal;
}
}

View File

@ -1,28 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.data.input;
import com.metamx.common.exception.FormattedException;
public interface InputRowParser<T>
{
public InputRow parse(T input) throws FormattedException;
public void addDimensionExclusion(String dimension);
}

View File

@ -1,80 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.data.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import com.metamx.common.parsers.JSONParser;
import com.metamx.common.parsers.Parser;
import io.druid.segment.incremental.SpatialDimensionSchema;
import java.util.List;
/**
*/
public class JSONDataSpec implements DataSpec
{
private final List<String> dimensions;
private final List<SpatialDimensionSchema> spatialDimensions;
@JsonCreator
public JSONDataSpec(
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions
)
{
this.dimensions = dimensions;
this.spatialDimensions = (spatialDimensions == null)
? Lists.<SpatialDimensionSchema>newArrayList()
: spatialDimensions;
}
@JsonProperty("dimensions")
@Override
public List<String> getDimensions()
{
return dimensions;
}
@JsonProperty("spatialDimensions")
@Override
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
}
@Override
public void verify(List<String> usedCols)
{
}
@Override
public boolean hasCustomDimensions()
{
return !(dimensions == null || dimensions.isEmpty());
}
@Override
public Parser<String, Object> getParser()
{
return new JSONParser();
}
}

View File

@ -1,117 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.data.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.exception.FormattedException;
import org.joda.time.DateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class MapInputRowParser implements InputRowParser<Map<String, Object>>
{
private final TimestampSpec timestampSpec;
private List<String> dimensions;
private final Set<String> dimensionExclusions;
@JsonCreator
public MapInputRowParser(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions
)
{
this.timestampSpec = timestampSpec;
if (dimensions != null) {
this.dimensions = ImmutableList.copyOf(dimensions);
}
this.dimensionExclusions = Sets.newHashSet();
if (dimensionExclusions != null) {
for (String dimensionExclusion : dimensionExclusions) {
this.dimensionExclusions.add(dimensionExclusion.toLowerCase());
}
}
this.dimensionExclusions.add(timestampSpec.getTimestampColumn().toLowerCase());
}
@Override
public InputRow parse(Map<String, Object> theMap) throws FormattedException
{
final List<String> dimensions = hasCustomDimensions()
? this.dimensions
: Lists.newArrayList(Sets.difference(theMap.keySet(), dimensionExclusions));
final DateTime timestamp;
try {
timestamp = timestampSpec.extractTimestamp(theMap);
if (timestamp == null) {
final String input = theMap.toString();
throw new NullPointerException(
String.format(
"Null timestamp in input: %s",
input.length() < 100 ? input : input.substring(0, 100) + "..."
)
);
}
}
catch (Exception e) {
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_TIMESTAMP)
.withMessage(e.toString())
.build();
}
return new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap);
}
private boolean hasCustomDimensions() {
return dimensions != null;
}
@Override
public void addDimensionExclusion(String dimension)
{
dimensionExclusions.add(dimension);
}
@JsonProperty
public TimestampSpec getTimestampSpec()
{
return timestampSpec;
}
@JsonProperty
public List<String> getDimensions()
{
return dimensions;
}
@JsonProperty
public Set<String> getDimensionExclusions()
{
return dimensionExclusions;
}
}

View File

@ -29,6 +29,8 @@ import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import java.io.InputStream;
import java.nio.ByteBuffer;

View File

@ -1,137 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.data.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.parsers.Parser;
import com.metamx.common.parsers.ToLowerCaseParser;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.List;
import java.util.Map;
/**
*/
public class StringInputRowParser implements ByteBufferInputRowParser
{
private final MapInputRowParser inputRowCreator;
private final Parser<String, Object> parser;
private final DataSpec dataSpec;
private CharBuffer chars = null;
@JsonCreator
public StringInputRowParser(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("data") DataSpec dataSpec,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions)
{
this.dataSpec = dataSpec;
this.inputRowCreator = new MapInputRowParser(timestampSpec, dataSpec.getDimensions(), dimensionExclusions);
this.parser = new ToLowerCaseParser(dataSpec.getParser());
}
public void addDimensionExclusion(String dimension)
{
inputRowCreator.addDimensionExclusion(dimension);
}
@Override
public InputRow parse(ByteBuffer input) throws FormattedException
{
return parseMap(buildStringKeyMap(input));
}
private Map<String, Object> buildStringKeyMap(ByteBuffer input)
{
int payloadSize = input.remaining();
if (chars == null || chars.remaining() < payloadSize)
{
chars = CharBuffer.allocate(payloadSize);
}
final CoderResult coderResult = Charsets.UTF_8.newDecoder()
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE)
.decode(input, chars, true);
Map<String, Object> theMap;
if (coderResult.isUnderflow())
{
chars.flip();
try
{
theMap = parseString(chars.toString());
} finally
{
chars.clear();
}
}
else
{
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
.withMessage(String.format("Failed with CoderResult[%s]", coderResult))
.build();
}
return theMap;
}
private Map<String, Object> parseString(String inputString)
{
return parser.parse(inputString);
}
public InputRow parse(String input) throws FormattedException
{
return parseMap(parseString(input));
}
private InputRow parseMap(Map<String, Object> theMap)
{
return inputRowCreator.parse(theMap);
}
@JsonProperty
public TimestampSpec getTimestampSpec()
{
return inputRowCreator.getTimestampSpec();
}
@JsonProperty("data")
public DataSpec getDataSpec()
{
return dataSpec;
}
@JsonProperty
public List<String> getDimensionExclusions()
{
return ImmutableList.copyOf(inputRowCreator.getDimensionExclusions());
}
}

View File

@ -1,69 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.data.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.metamx.common.parsers.ParserUtils;
import org.joda.time.DateTime;
import java.util.Map;
/**
*/
public class TimestampSpec
{
private static final String defaultFormat = "auto";
private final String timestampColumn;
private final String timestampFormat;
private final Function<String, DateTime> timestampConverter;
@JsonCreator
public TimestampSpec(
@JsonProperty("column") String timestampColumn,
@JsonProperty("format") String format
)
{
this.timestampColumn = timestampColumn;
this.timestampFormat = format == null ? defaultFormat : format;
this.timestampConverter = ParserUtils.createTimestampParser(timestampFormat);
}
@JsonProperty("column")
public String getTimestampColumn()
{
return timestampColumn;
}
@JsonProperty("format")
public String getTimestampFormat()
{
return timestampFormat;
}
public DateTime extractTimestamp(Map<String, Object> input)
{
final Object o = input.get(timestampColumn);
return o == null ? null : timestampConverter.apply(o.toString());
}
}

View File

@ -1,77 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.data.input;
import com.fasterxml.jackson.annotation.JsonValue;
import com.metamx.common.parsers.Parser;
import com.metamx.common.parsers.ToLowerCaseParser;
import io.druid.segment.incremental.SpatialDimensionSchema;
import java.util.List;
/**
*/
public class ToLowercaseDataSpec implements DataSpec
{
private final DataSpec delegate;
public ToLowercaseDataSpec(
DataSpec delegate
)
{
this.delegate = delegate;
}
@Override
public void verify(List<String> usedCols)
{
delegate.verify(usedCols);
}
@Override
public boolean hasCustomDimensions()
{
return delegate.hasCustomDimensions();
}
@Override
public List<String> getDimensions()
{
return delegate.getDimensions();
}
@Override
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return delegate.getSpatialDimensions();
}
@Override
public Parser<String, Object> getParser()
{
return new ToLowerCaseParser(delegate.getParser());
}
@JsonValue
public DataSpec getDelegate()
{
return delegate;
}
}

View File

@ -125,7 +125,7 @@ public class IndexIO
public static QueryableIndex loadIndex(File inDir) throws IOException
{
init();
final int version = getVersionFromDir(inDir);
final int version = SegmentUtils.getVersionFromDir(inDir);
final IndexLoader loader = indexLoaders.get(version);
@ -187,7 +187,7 @@ public class IndexIO
public static boolean convertSegment(File toConvert, File converted) throws IOException
{
final int version = getVersionFromDir(toConvert);
final int version = SegmentUtils.getVersionFromDir(toConvert);
switch (version) {
case 1:

View File

@ -36,6 +36,7 @@ import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;

View File

@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.collect.Maps;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.IndexableAdapter;
import io.druid.segment.Rowboat;

View File

@ -22,6 +22,7 @@ package io.druid.segment.incremental;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;

View File

@ -30,6 +30,7 @@ import com.google.common.collect.Sets;
import com.google.common.primitives.Floats;
import com.metamx.common.ISE;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.SpatialDimensionSchema;
import java.util.Arrays;
import java.util.List;

View File

@ -1,67 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.incremental;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import java.util.List;
/**
*/
public class SpatialDimensionSchema
{
private final String dimName;
private final List<String> dims;
@JsonCreator
public SpatialDimensionSchema(
@JsonProperty("dimName") String dimName,
@JsonProperty("dims") List<String> dims
)
{
this.dimName = dimName.toLowerCase();
this.dims = Lists.transform(
dims,
new Function<String, String>()
{
@Override
public String apply(String input)
{
return input.toLowerCase();
}
}
);
}
@JsonProperty
public String getDimName()
{
return dimName;
}
@JsonProperty
public List<String> getDims()
{
return dims;
}
}

View File

@ -1,88 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.data.input;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.incremental.SpatialDimensionSchema;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.junit.Test;
import java.nio.ByteBuffer;
public class InputRowParserSerdeTest
{
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testStringInputRowParserSerde() throws Exception
{
final StringInputRowParser parser = new StringInputRowParser(
new TimestampSpec("timestamp", "iso"),
new JSONDataSpec(
ImmutableList.of("foo", "bar"), ImmutableList.<SpatialDimensionSchema>of()
),
ImmutableList.of("baz")
);
final ByteBufferInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsBytes(parser),
ByteBufferInputRowParser.class
);
final InputRow parsed = parser2.parse(
ByteBuffer.wrap(
"{\"foo\":\"x\",\"bar\":\"y\",\"qux\":\"z\",\"timestamp\":\"2000\"}".getBytes(Charsets.UTF_8)
)
);
Assert.assertEquals(ImmutableList.of("foo", "bar"), parsed.getDimensions());
Assert.assertEquals(ImmutableList.of("x"), parsed.getDimension("foo"));
Assert.assertEquals(ImmutableList.of("y"), parsed.getDimension("bar"));
Assert.assertEquals(new DateTime("2000").getMillis(), parsed.getTimestampFromEpoch());
}
@Test
public void testMapInputRowParserSerde() throws Exception
{
final MapInputRowParser parser = new MapInputRowParser(
new TimestampSpec("timestamp", "iso"),
ImmutableList.of("foo", "bar"),
ImmutableList.of("baz")
);
final MapInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsBytes(parser),
MapInputRowParser.class
);
final InputRow parsed = parser2.parse(
ImmutableMap.<String, Object>of(
"foo", "x",
"bar", "y",
"qux", "z",
"timestamp", "2000"
)
);
Assert.assertEquals(ImmutableList.of("foo", "bar"), parsed.getDimensions());
Assert.assertEquals(ImmutableList.of("x"), parsed.getDimension("foo"));
Assert.assertEquals(ImmutableList.of("y"), parsed.getDimension("bar"));
Assert.assertEquals(new DateTime("2000").getMillis(), parsed.getTimestampFromEpoch());
}
}

View File

@ -19,6 +19,7 @@
package io.druid.data.input;
import io.druid.data.input.impl.TimestampSpec;
import org.joda.time.DateTime;
import org.junit.Test;

View File

@ -25,9 +25,9 @@ import com.google.common.io.CharStreams;
import com.google.common.io.InputSupplier;
import com.google.common.io.LineProcessor;
import com.metamx.common.logger.Logger;
import io.druid.data.input.DelimitedDataSpec;
import io.druid.data.input.StringInputRowParser;
import io.druid.data.input.TimestampSpec;
import io.druid.data.input.impl.DelimitedDataSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;

View File

@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
import com.metamx.collections.spatial.search.RadiusBound;
import com.metamx.collections.spatial.search.RectangularBound;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
@ -46,7 +47,6 @@ import io.druid.segment.Segment;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.SpatialDimensionSchema;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Test;

View File

@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
import com.metamx.collections.spatial.search.RadiusBound;
import com.metamx.collections.spatial.search.RectangularBound;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
@ -46,7 +47,6 @@ import io.druid.segment.Segment;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.SpatialDimensionSchema;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Test;

View File

@ -1,175 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright (C) 2012, 2013 Metamarkets Group Inc.
~
~ This program is free software; you can redistribute it and/or
~ modify it under the terms of the GNU General Public License
~ as published by the Free Software Foundation; either version 2
~ of the License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU General Public License for more details.
~
~ You should have received a copy of the GNU General Public License
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-realtime</artifactId>
<name>druid-realtime</name>
<description>druid-realtime</description>
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.6.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>server-metrics</artifactId>
</dependency>
<dependency>
<groupId>org.apache.directory.studio</groupId>
<artifactId>org.apache.commons.collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>kafka</groupId>
<artifactId>core-kafka</artifactId>
<version>0.7.2-mmx1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.1.1</version>
</dependency>
<!-- Dependencies required for jets3t b/c emr pom doesn't include them -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.jamesmurty.utils</groupId>
<artifactId>java-xmlbuilder</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.ircclouds.irc</groupId>
<artifactId>irc-api</artifactId>
</dependency>
<dependency>
<groupId>com.maxmind.geoip2</groupId>
<artifactId>geoip2</artifactId>
</dependency>
<!-- Dependencies required for jets3t -->
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -1,44 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.realtime;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.S3DataSegmentPusher;
import io.druid.segment.loading.S3DataSegmentPusherConfig;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
/**
* A placeholder class to make the move of the SegmentPushers to a new package backwards compatible
*
* Exists in 0.2, can be removed from 0.3 on
*/
@Deprecated
public class S3SegmentPusher extends S3DataSegmentPusher implements DataSegmentPusher
{
public S3SegmentPusher(
RestS3Service s3Client,
S3DataSegmentPusherConfig config,
ObjectMapper jsonMapper
)
{
super(s3Client, config, jsonMapper);
}
}

View File

@ -1,96 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.realtime.firehose;
import com.google.common.base.Throwables;
import io.druid.common.guava.Runnables;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.data.input.StringInputRowParser;
import org.apache.commons.io.LineIterator;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
/**
*/
public class FileIteratingFirehose implements Firehose
{
private final Iterator<LineIterator> lineIterators;
private final StringInputRowParser parser;
private LineIterator lineIterator = null;
public FileIteratingFirehose(
Iterator<LineIterator> lineIterators,
StringInputRowParser parser
)
{
this.lineIterators = lineIterators;
this.parser = parser;
}
@Override
public boolean hasMore()
{
try {
return lineIterators.hasNext() || (lineIterator != null && lineIterator.hasNext());
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
public InputRow nextRow()
{
try {
if (lineIterator == null || !lineIterator.hasNext()) {
// Close old streams, maybe.
if (lineIterator != null) {
lineIterator.close();
}
lineIterator = lineIterators.next();
}
return parser.parse(lineIterator.next());
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override
public void close() throws IOException
{
if (lineIterator != null) {
lineIterator.close();
}
}
}

60
s3-extensions/pom.xml Normal file
View File

@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright (C) 2012, 2013 Metamarkets Group Inc.
~
~ This program is free software; you can redistribute it and/or
~ modify it under the terms of the GNU General Public License
~ as published by the Free Software Foundation; either version 2
~ of the License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU General Public License for more details.
~
~ You should have received a copy of the GNU General Public License
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-s3-extensions</artifactId>
<name>druid-s3-extensions</name>
<description>druid-s3-extensions</description>
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.6.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
<version>0.8.1</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -17,25 +17,25 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.index;
package io.druid.firehose.s3;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.StringInputRowParser;
import io.druid.segment.realtime.firehose.FileIteratingFirehose;
import io.druid.data.input.impl.FileIteratingFirehose;
import io.druid.data.input.impl.StringInputRowParser;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import org.jets3t.service.S3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
import java.io.BufferedReader;
@ -51,7 +51,6 @@ import java.util.zip.GZIPInputStream;
/**
* Builds firehoses that read from a predefined list of S3 objects and then dry up.
*/
@JsonTypeName("s3")
public class StaticS3FirehoseFactory implements FirehoseFactory
{
private static final Logger log = new Logger(StaticS3FirehoseFactory.class);
@ -119,8 +118,9 @@ public class StaticS3FirehoseFactory implements FirehoseFactory
log.info("Reading from bucket[%s] object[%s] (%s)", s3Bucket, s3Object.getKey(), nextURI);
try {
final InputStream innerInputStream = s3Client.getObject(s3Bucket, s3Object.getKey())
.getDataInputStream();
final InputStream innerInputStream = s3Client.getObject(
new S3Bucket(s3Bucket), s3Object.getKey())
.getDataInputStream();
final InputStream outerInputStream = s3Object.getKey().endsWith(".gz")
? new GZIPInputStream(innerInputStream)

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading;
package io.druid.storage.s3;
import com.fasterxml.jackson.annotation.JsonProperty;

View File

@ -17,11 +17,13 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading;
package io.druid.storage.s3;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading;
package io.druid.storage.s3;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
@ -27,14 +27,15 @@ import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.CompressionUtils;
import io.druid.storage.s3.S3Utils;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.utils.CompressionUtils;
import org.apache.commons.io.FileUtils;
import org.jets3t.service.ServiceException;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
import org.jets3t.service.model.StorageObject;
import java.io.File;
import java.io.IOException;
@ -92,7 +93,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller
S3Object s3Obj = null;
try {
s3Obj = s3Client.getObject(s3Coords.bucket, s3Coords.path);
s3Obj = s3Client.getObject(new S3Bucket(s3Coords.bucket), s3Coords.path);
InputStream in = null;
try {
@ -154,7 +155,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller
@Override
public Boolean call() throws Exception
{
return s3Client.isObjectInBucket(coords.bucket, coords.path);
return S3Utils.isObjectInBucket(s3Client, coords.bucket, coords.path);
}
}
);
@ -165,7 +166,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller
catch (IOException e) {
throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
}
catch (ServiceException e) {
catch (S3ServiceException e) {
throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
}
}
@ -175,13 +176,13 @@ public class S3DataSegmentPuller implements DataSegmentPuller
{
final S3Coords coords = new S3Coords(segment);
try {
final StorageObject objDetails = S3Utils.retryS3Operation(
new Callable<StorageObject>()
final S3Object objDetails = S3Utils.retryS3Operation(
new Callable<S3Object>()
{
@Override
public StorageObject call() throws Exception
public S3Object call() throws Exception
{
return s3Client.getObjectDetails(coords.bucket, coords.path);
return s3Client.getObjectDetails(new S3Bucket(coords.bucket), coords.path);
}
}
);
@ -190,10 +191,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
catch (IOException e) {
throw new SegmentLoadingException(e, e.getMessage());
}
catch (ServiceException e) {
catch (S3ServiceException | IOException e) {
throw new SegmentLoadingException(e, e.getMessage());
}
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading;
package io.druid.storage.s3;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
@ -27,11 +27,13 @@ import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.common.utils.CompressionUtils;
import io.druid.segment.IndexIO;
import io.druid.storage.s3.S3Utils;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import io.druid.utils.CompressionUtils;
import org.jets3t.service.ServiceException;
import org.jets3t.service.acl.AccessControlList;
import org.jets3t.service.acl.gs.GSAccessControlList;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
@ -61,9 +63,10 @@ public class S3DataSegmentPusher implements DataSegmentPusher
this.jsonMapper = jsonMapper;
}
public S3DataSegmentPusherConfig getConfig()
@Override
public String getPathForHadoop(String dataSource)
{
return config;
return String.format("s3n://%s/%s/%s", config.getBucket(), config.getBaseKey(), dataSource);
}
@Override
@ -90,7 +93,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher
toPush.setBucketName(outputBucket);
toPush.setKey(outputKey + "/index.zip");
if (!config.getDisableAcl()) {
toPush.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
toPush.setAcl(AccessControlList.REST_CANNED_AUTHENTICATED_READ);
}
log.info("Pushing %s.", toPush);
@ -107,7 +110,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher
toPush.getKey()
)
)
.withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir));
.withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
File descriptorFile = File.createTempFile("druid", "descriptor.json");
Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(inSegment)), descriptorFile);

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading;
package io.druid.storage.s3;
import com.fasterxml.jackson.annotation.JsonProperty;

View File

@ -0,0 +1,79 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.s3;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Provides;
import com.google.inject.ProvisionException;
import io.druid.guice.Binders;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.initialization.DruidModule;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import java.util.List;
/**
*/
public class S3StorageDruidModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of();
}
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.s3", AWSCredentialsConfig.class);
Binders.dataSegmentPullerBinder(binder).addBinding("s3_zip").to(S3DataSegmentPuller.class).in(LazySingleton.class);
Binders.dataSegmentKillerBinder(binder).addBinding("s3_zip").to(S3DataSegmentKiller.class).in(LazySingleton.class);
Binders.dataSegmentPusherBinder(binder).addBinding("s3").to(S3DataSegmentPusher.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class);
Binders.taskLogsBinder(binder).addBinding("s3").to(S3TaskLogs.class);
JsonConfigProvider.bind(binder, "druid.indexer.logs", S3TaskLogsConfig.class);
binder.bind(S3TaskLogs.class).in(LazySingleton.class);
}
@Provides
@LazySingleton
public org.jets3t.service.security.AWSCredentials getJets3tAWSCredentials(AWSCredentialsConfig config)
{
return new org.jets3t.service.security.AWSCredentials(config.getAccessKey(), config.getSecretKey());
}
@Provides
@LazySingleton
public RestS3Service getRestS3Service(org.jets3t.service.security.AWSCredentials credentials)
{
try {
return new RestS3Service(credentials);
}
catch (S3ServiceException e) {
throw new ProvisionException("Unable to create a RestS3Service", e);
}
}
}

View File

@ -17,14 +17,14 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.tasklogs;
package io.druid.storage.s3;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.storage.s3.S3Utils;
import io.druid.tasklogs.TaskLogs;
import org.jets3t.service.ServiceException;
import org.jets3t.service.StorageService;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.tasklogs;
package io.druid.storage.s3;
import com.fasterxml.jackson.annotation.JsonProperty;

View File

@ -21,7 +21,9 @@ package io.druid.storage.s3;
import com.google.common.base.Throwables;
import com.metamx.common.logger.Logger;
import org.jets3t.service.ServiceException;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
import java.io.IOException;
@ -53,7 +55,7 @@ public class S3Utils
* Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not
* found, etc) are not retried.
*/
public static <T> T retryS3Operation(Callable<T> f) throws IOException, ServiceException, InterruptedException
public static <T> T retryS3Operation(Callable<T> f) throws IOException, S3ServiceException, InterruptedException
{
int nTry = 0;
final int maxTries = 3;
@ -69,10 +71,10 @@ public class S3Utils
throw e;
}
}
catch (ServiceException e) {
catch (S3ServiceException e) {
if (nTry <= maxTries &&
(e.getCause() instanceof IOException ||
(e.getErrorCode() != null && e.getErrorCode().equals("RequestTimeout")))) {
(e.getS3ErrorCode() != null && e.getS3ErrorCode().equals("RequestTimeout")))) {
awaitNextRetry(e, nTry);
} else {
throw e;
@ -96,4 +98,29 @@ public class S3Utils
log.info(e, "S3 fail on try %d, retrying in %,dms.", nTry, sleepMillis);
Thread.sleep(sleepMillis);
}
public static boolean isObjectInBucket(RestS3Service s3Client, String bucketName, String objectKey)
throws S3ServiceException
{
try {
s3Client.getObjectDetails(new S3Bucket(bucketName), objectKey);
}
catch (S3ServiceException e) {
if (404 == e.getResponseCode()
|| "NoSuchKey".equals(e.getS3ErrorCode())
|| "NoSuchBucket".equals(e.getS3ErrorCode()))
{
return false;
}
if ("AccessDenied".equals(e.getS3ErrorCode()))
{
// Object is inaccessible to current user, but does exist.
return true;
}
// Something else has gone wrong
throw e;
}
return true;
}
}

View File

@ -0,0 +1 @@
io.druid.storage.cassandra.S3StorageDruidModule

View File

@ -136,10 +136,6 @@
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
@ -164,10 +160,6 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
</dependency>
<dependency>
<groupId>io.tesla.aether</groupId>
<artifactId>tesla-aether</artifactId>
@ -176,29 +168,6 @@
<groupId>org.eclipse.aether</groupId>
<artifactId>aether-api</artifactId>
</dependency>
<!-- Dependencies required for jets3t b/c emr pom doesn't include them -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.jamesmurty.utils</groupId>
<artifactId>java-xmlbuilder</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
@ -208,8 +177,8 @@
<artifactId>spymemcached</artifactId>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
@ -223,6 +192,22 @@
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlets</artifactId>
</dependency>
<dependency>
<groupId>kafka</groupId>
<artifactId>core-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<dependency>
<groupId>com.ircclouds.irc</groupId>
<artifactId>irc-api</artifactId>
</dependency>
<dependency>
<groupId>com.maxmind.geoip2</groupId>
<artifactId>geoip2</artifactId>
</dependency>
<!-- Tests -->
<dependency>

View File

@ -23,13 +23,10 @@ import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.AmazonEC2Client;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.ProvisionException;
import io.druid.segment.loading.AWSCredentialsConfig;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
/**
*/
@ -48,29 +45,30 @@ public class AWSModule implements Module
return new BasicAWSCredentials(config.getAccessKey(), config.getSecretKey());
}
@Provides
@LazySingleton
public org.jets3t.service.security.AWSCredentials getJets3tAWSCredentials(AWSCredentialsConfig config)
{
return new org.jets3t.service.security.AWSCredentials(config.getAccessKey(), config.getSecretKey());
}
@Provides
@LazySingleton
public RestS3Service getRestS3Service(org.jets3t.service.security.AWSCredentials credentials)
{
try {
return new RestS3Service(credentials);
}
catch (S3ServiceException e) {
throw new ProvisionException("Unable to create a RestS3Service", e);
}
}
@Provides
@LazySingleton
public AmazonEC2 getEc2Client(AWSCredentials credentials)
{
return new AmazonEC2Client(credentials);
}
public static class AWSCredentialsConfig
{
@JsonProperty
private String accessKey = "";
@JsonProperty
private String secretKey = "";
public String getAccessKey()
{
return accessKey;
}
public String getSecretKey()
{
return secretKey;
}
}
}

View File

@ -23,18 +23,11 @@ import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.HdfsDataSegmentPuller;
import io.druid.segment.loading.HdfsDataSegmentPusher;
import io.druid.segment.loading.HdfsDataSegmentPusherConfig;
import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.loading.LocalDataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentPusherConfig;
import io.druid.segment.loading.OmniSegmentLoader;
import io.druid.segment.loading.S3DataSegmentPuller;
import io.druid.segment.loading.S3DataSegmentPusher;
import io.druid.segment.loading.S3DataSegmentPusherConfig;
import io.druid.segment.loading.SegmentLoader;
import org.apache.hadoop.conf.Configuration;
/**
*/
@ -46,8 +39,6 @@ public class DataSegmentPusherPullerModule implements Module
binder.bind(SegmentLoader.class).to(OmniSegmentLoader.class).in(LazySingleton.class);
bindDeepStorageLocal(binder);
bindDeepStorageS3(binder);
bindDeepStorageHdfs(binder);
PolyBind.createChoice(
binder, "druid.storage.type", Key.get(DataSegmentPusher.class), Key.get(LocalDataSegmentPusher.class)
@ -56,7 +47,7 @@ public class DataSegmentPusherPullerModule implements Module
private static void bindDeepStorageLocal(Binder binder)
{
DruidBinders.dataSegmentPullerBinder(binder)
Binders.dataSegmentPullerBinder(binder)
.addBinding("local")
.to(LocalDataSegmentPuller.class)
.in(LazySingleton.class);
@ -67,34 +58,4 @@ public class DataSegmentPusherPullerModule implements Module
.in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.storage", LocalDataSegmentPusherConfig.class);
}
private static void bindDeepStorageS3(Binder binder)
{
DruidBinders.dataSegmentPullerBinder(binder)
.addBinding("s3_zip")
.to(S3DataSegmentPuller.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class))
.addBinding("s3")
.to(S3DataSegmentPusher.class)
.in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class);
}
private static void bindDeepStorageHdfs(Binder binder)
{
DruidBinders.dataSegmentPullerBinder(binder)
.addBinding("hdfs")
.to(HdfsDataSegmentPuller.class)
.in(LazySingleton.class);
binder.bind(Configuration.class).toInstance(new Configuration());
PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class))
.addBinding("hdfs")
.to(HdfsDataSegmentPusher.class)
.in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.storage", HdfsDataSegmentPusherConfig.class);
}
}

View File

@ -27,7 +27,6 @@ import com.metamx.metrics.Monitor;
import io.druid.query.Query;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.server.DruidNode;
/**
@ -48,11 +47,6 @@ public class DruidBinders
);
}
public static MapBinder<String, DataSegmentPuller> dataSegmentPullerBinder(Binder binder)
{
return MapBinder.newMapBinder(binder, String.class, DataSegmentPuller.class);
}
public static Multibinder<KeyHolder<DruidNode>> discoveryAnnouncementBinder(Binder binder)
{
return Multibinder.newSetBinder(binder, new TypeLiteral<KeyHolder<DruidNode>>(){});

View File

@ -23,17 +23,12 @@ import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import druid.examples.web.WebFirehoseFactory;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
import io.druid.data.input.ProtoBufInputRowParser;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
@ -55,19 +50,15 @@ public class FirehoseModule implements DruidModule
return Arrays.<Module>asList(
new SimpleModule("FirehoseModule")
.registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream"),
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"),
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
new NamedType(ClippedFirehoseFactory.class, "clipped"),
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(StaticS3FirehoseFactory.class, "s3"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
new NamedType(LocalFirehoseFactory.class, "local")
new NamedType(LocalFirehoseFactory.class, "local"),
new NamedType(ProtoBufInputRowParser.class, "protobuf")
)
);
}
}

View File

@ -23,11 +23,9 @@ import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder;
import io.druid.indexing.common.tasklogs.NoopTaskLogs;
import io.druid.indexing.common.tasklogs.S3TaskLogs;
import io.druid.indexing.common.tasklogs.S3TaskLogsConfig;
import io.druid.indexing.common.tasklogs.TaskLogPusher;
import io.druid.indexing.common.tasklogs.TaskLogs;
import io.druid.tasklogs.NoopTaskLogs;
import io.druid.tasklogs.TaskLogPusher;
import io.druid.tasklogs.TaskLogs;
/**
*/
@ -37,12 +35,8 @@ public class TaskLogsModule implements Module
public void configure(Binder binder)
{
PolyBind.createChoice(binder, "druid.indexer.logs.type", Key.get(TaskLogs.class), Key.get(NoopTaskLogs.class));
final MapBinder<String, TaskLogs> taskLogBinder = PolyBind.optionBinder(binder, Key.get(TaskLogs.class));
JsonConfigProvider.bind(binder, "druid.indexer.logs", S3TaskLogsConfig.class);
taskLogBinder.addBinding("s3").to(S3TaskLogs.class);
binder.bind(S3TaskLogs.class).in(LazySingleton.class);
final MapBinder<String, TaskLogs> taskLogBinder = Binders.taskLogsBinder(binder);
taskLogBinder.addBinding("noop").to(NoopTaskLogs.class).in(LazySingleton.class);
binder.bind(NoopTaskLogs.class).in(LazySingleton.class);

View File

@ -17,14 +17,14 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.cli;
package io.druid.initialization;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
@ -38,12 +38,14 @@ import io.druid.guice.AWSModule;
import io.druid.guice.AnnouncerModule;
import io.druid.guice.DataSegmentPusherPullerModule;
import io.druid.guice.DbConnectorModule;
import io.druid.guice.DruidGuiceExtensions;
import io.druid.guice.DruidProcessingModule;
import io.druid.guice.DruidSecondaryModule;
import io.druid.guice.FirehoseModule;
import io.druid.guice.HttpClientModule;
import io.druid.guice.IndexingServiceDiscoveryModule;
import io.druid.guice.JacksonConfigManagerModule;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LifecycleModule;
import io.druid.guice.QueryRunnerFactoryModule;
import io.druid.guice.QueryableModule;
@ -54,10 +56,12 @@ import io.druid.guice.TaskLogsModule;
import io.druid.guice.annotations.Client;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.initialization.DruidModule;
import io.druid.jackson.JacksonModule;
import io.druid.server.initialization.ConfigModule;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.initialization.ExtensionsConfig;
import io.druid.server.initialization.JettyServerModule;
import io.druid.server.initialization.PropertiesModule;
import io.druid.server.metrics.MetricsModule;
import io.tesla.aether.TeslaAether;
import io.tesla.aether.internal.DefaultTeslaAether;
@ -68,10 +72,14 @@ import org.eclipse.aether.graph.Dependency;
import org.eclipse.aether.graph.DependencyFilter;
import org.eclipse.aether.graph.DependencyNode;
import org.eclipse.aether.resolution.DependencyRequest;
import org.eclipse.aether.resolution.DependencyResolutionException;
import org.eclipse.aether.util.artifact.JavaScopes;
import org.eclipse.aether.util.filter.DependencyFilterUtils;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Collections;
@ -82,11 +90,10 @@ import java.util.Set;
/**
*/
public class
Initialization
public class Initialization
{
private static final Logger log = new Logger(Initialization.class);
private static final Map<String, ClassLoader> loadersMap = Maps.newHashMap();
private static final Map<String, URLClassLoader> loadersMap = Maps.newHashMap();
private static final Set<String> exclusions = Sets.newHashSet(
"io.druid",
@ -108,57 +115,7 @@ public class
for (String coordinate : config.getCoordinates()) {
log.info("Loading extension[%s]", coordinate);
try {
ClassLoader loader = loadersMap.get(coordinate);
if (loader == null) {
final CollectRequest collectRequest = new CollectRequest();
collectRequest.setRoot(new Dependency(new DefaultArtifact(coordinate), JavaScopes.RUNTIME));
DependencyRequest dependencyRequest = new DependencyRequest(
collectRequest,
DependencyFilterUtils.andFilter(
DependencyFilterUtils.classpathFilter(JavaScopes.RUNTIME),
new DependencyFilter()
{
@Override
public boolean accept(DependencyNode node, List<DependencyNode> parents)
{
if (accept(node.getArtifact())) {
return false;
}
for (DependencyNode parent : parents) {
if (accept(parent.getArtifact())) {
return false;
}
}
return true;
}
private boolean accept(final Artifact artifact)
{
return exclusions.contains(artifact.getGroupId());
}
}
)
);
final List<Artifact> artifacts = aether.resolveArtifacts(dependencyRequest);
List<URL> urls = Lists.newArrayListWithExpectedSize(artifacts.size());
for (Artifact artifact : artifacts) {
if (!exclusions.contains(artifact.getGroupId())) {
urls.add(artifact.getFile().toURI().toURL());
} else {
log.error("Skipped Artifact[%s]", artifact);
}
}
for (URL url : urls) {
log.info("Added URL[%s]", url);
}
loader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Initialization.class.getClassLoader());
loadersMap.put(coordinate, loader);
}
URLClassLoader loader = getClassLoaderForCoordinates(aether, coordinate);
final ServiceLoader<T> serviceLoader = ServiceLoader.load(clazz, loader);
@ -175,7 +132,64 @@ public class
return retVal;
}
private static DefaultTeslaAether getAetherClient(ExtensionsConfig config)
public static URLClassLoader getClassLoaderForCoordinates(TeslaAether aether, String coordinate)
throws DependencyResolutionException, MalformedURLException
{
URLClassLoader loader = loadersMap.get(coordinate);
if (loader == null) {
final CollectRequest collectRequest = new CollectRequest();
collectRequest.setRoot(new Dependency(new DefaultArtifact(coordinate), JavaScopes.RUNTIME));
DependencyRequest dependencyRequest = new DependencyRequest(
collectRequest,
DependencyFilterUtils.andFilter(
DependencyFilterUtils.classpathFilter(JavaScopes.RUNTIME),
new DependencyFilter()
{
@Override
public boolean accept(DependencyNode node, List<DependencyNode> parents)
{
if (accept(node.getArtifact())) {
return false;
}
for (DependencyNode parent : parents) {
if (accept(parent.getArtifact())) {
return false;
}
}
return true;
}
private boolean accept(final Artifact artifact)
{
return exclusions.contains(artifact.getGroupId());
}
}
)
);
final List<Artifact> artifacts = aether.resolveArtifacts(dependencyRequest);
List<URL> urls = Lists.newArrayListWithExpectedSize(artifacts.size());
for (Artifact artifact : artifacts) {
if (!exclusions.contains(artifact.getGroupId())) {
urls.add(artifact.getFile().toURI().toURL());
} else {
log.debug("Skipped Artifact[%s]", artifact);
}
}
for (URL url : urls) {
log.info("Added URL[%s]", url);
}
loader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Initialization.class.getClassLoader());
loadersMap.put(coordinate, loader);
}
return loader;
}
public static DefaultTeslaAether getAetherClient(ExtensionsConfig config)
{
/*
DefaultTeslaAether logs a bunch of stuff to System.out, which is annoying. We choose to disable that
@ -194,7 +208,28 @@ public class
PrintStream oldOut = System.out;
try {
System.setOut(new PrintStream(ByteStreams.nullOutputStream()));
System.setOut(new PrintStream(
new OutputStream()
{
@Override
public void write(int b) throws IOException
{
}
@Override
public void write(byte[] b) throws IOException
{
}
@Override
public void write(byte[] b, int off, int len) throws IOException
{
}
}
));
return new DefaultTeslaAether(config.getLocalRepository(), config.getRemoteRepositories());
}
finally {
@ -244,6 +279,25 @@ public class
return Guice.createInjector(Modules.override(defaultModules.getModules()).with(actualModules.getModules()));
}
public static Injector makeStartupInjector()
{
return Guice.createInjector(
new DruidGuiceExtensions(),
new JacksonModule(),
new PropertiesModule("runtime.properties"),
new ConfigModule(),
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(DruidSecondaryModule.class);
JsonConfigProvider.bind(binder, "druid.extensions", ExtensionsConfig.class);
}
}
);
}
private static class ModuleList
{
private final Injector baseInjector;

View File

@ -1,63 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading;
import com.google.common.base.Joiner;
import io.druid.timeline.DataSegment;
import org.joda.time.format.ISODateTimeFormat;
/**
*/
public class DataSegmentPusherUtil
{
private static final Joiner JOINER = Joiner.on("/").skipNulls();
public static String getStorageDir(DataSegment segment)
{
return JOINER.join(
segment.getDataSource(),
String.format(
"%s_%s",
segment.getInterval().getStart(),
segment.getInterval().getEnd()
),
segment.getVersion(),
segment.getShardSpec().getPartitionNum()
);
}
/**
* Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in
* path names. So we format paths differently for HDFS.
*/
public static String getHdfsStorageDir(DataSegment segment)
{
return JOINER.join(
segment.getDataSource(),
String.format(
"%s_%s",
segment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()),
segment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime())
),
segment.getVersion().replaceAll(":", "_"),
segment.getShardSpec().getPartitionNum()
);
}
}

View File

@ -22,8 +22,8 @@ package io.druid.segment.loading;
import com.google.common.io.Files;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.CompressionUtils;
import io.druid.timeline.DataSegment;
import io.druid.utils.CompressionUtils;
import java.io.File;
import java.io.IOException;

View File

@ -25,9 +25,9 @@ import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.CompressionUtils;
import io.druid.segment.IndexIO;
import io.druid.segment.SegmentUtils;
import io.druid.timeline.DataSegment;
import io.druid.utils.CompressionUtils;
import java.io.File;
import java.io.IOException;
@ -51,6 +51,12 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
this.jsonMapper = jsonMapper;
}
@Override
public String getPathForHadoop(String dataSource)
{
return String.format("file://%s/%s", config.getStorageDirectory(), dataSource);
}
@Override
public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOException
{
@ -65,7 +71,7 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
return createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(outDir))
.withSize(size)
.withBinaryVersion(IndexIO.getVersionFromDir(dataSegmentFile)),
.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)),
outDir
);
}
@ -78,7 +84,7 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
return createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(outFile))
.withSize(size)
.withBinaryVersion(IndexIO.getVersionFromDir(dataSegmentFile)),
.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)),
outDir
);
}

View File

@ -0,0 +1,60 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import io.druid.timeline.DataSegment;
import java.util.Map;
/**
*/
public class OmniDataSegmentKiller implements DataSegmentKiller
{
private final Map<String, DataSegmentKiller> killers;
@Inject
public OmniDataSegmentKiller(
Map<String, DataSegmentKiller> killers
)
{
this.killers = killers;
}
@Override
public void kill(DataSegment segment) throws SegmentLoadingException
{
getKiller(segment).kill(segment);
}
private DataSegmentKiller getKiller(DataSegment segment) throws SegmentLoadingException
{
String type = MapUtils.getString(segment.getLoadSpec(), "type");
DataSegmentKiller loader = killers.get(type);
if (loader == null) {
throw new SegmentLoadingException("Unknown loader type[%s]. Known types are %s", type, killers.keySet());
}
return loader;
}
}

Some files were not shown because too many files have changed in this diff Show More