Merge pull request #218 from metamx/guice-rt

Port Realtime nodes to guice
This commit is contained in:
cheddar 2013-08-22 16:00:12 -07:00
commit 1aa99f5878
46 changed files with 1414 additions and 677 deletions

View File

@ -1,7 +1,9 @@
package com.metamx.druid.coordination;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.druid.guice.annotations.Json;
import javax.validation.constraints.NotNull;
import java.util.Arrays;
@ -18,10 +20,6 @@ public class LegacyDataSegmentAnnouncerProvider implements DataSegmentAnnouncerP
@NotNull
private BatchDataSegmentAnnouncer batchAnnouncer = null;
@JacksonInject
@NotNull
private Lifecycle lifecycle = null;
@Override
public DataSegmentAnnouncer get()
{

View File

@ -0,0 +1,35 @@
package com.metamx.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.coordination.BatchDataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncerProvider;
import com.metamx.druid.coordination.SingleDataSegmentAnnouncer;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.BatchDataSegmentAnnouncerConfig;
import org.apache.curator.framework.CuratorFramework;
/**
*/
public class AnnouncerModule implements Module
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.announcer", BatchDataSegmentAnnouncerConfig.class);
JsonConfigProvider.bind(binder, "druid.announcer", DataSegmentAnnouncerProvider.class);
binder.bind(DataSegmentAnnouncer.class).toProvider(DataSegmentAnnouncerProvider.class);
binder.bind(BatchDataSegmentAnnouncer.class).in(ManageLifecycleLast.class);
binder.bind(SingleDataSegmentAnnouncer.class).in(ManageLifecycleLast.class);
}
@Provides
@ManageLifecycle
public Announcer getAnnouncer(CuratorFramework curator)
{
return new Announcer(curator, Execs.singleThreaded("Announcer-%s"));
}
}

View File

@ -7,7 +7,7 @@ cd druid-services-* 2>&1 > /dev/null
mkdir logs 2>&1 > /dev/null
# Now start a realtime node
nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=config/realtime/realtime.spec -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/realtime com.metamx.druid.realtime.RealtimeMain 2>&1 > logs/realtime.log &
nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=config/realtime/realtime.spec -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/realtime io.druid.cli.Main server realtime 2>&1 > logs/realtime.log &
# And a master node
nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/master io.druid.cli.Main server coordinator 2>&1 > logs/master.log &

View File

@ -63,4 +63,4 @@ DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config
echo "Running command:"
(set -x; java ${JAVA_ARGS} -classpath ${DRUID_CP} druid.examples.RealtimeStandaloneMain)
(set -x; java ${JAVA_ARGS} -classpath ${DRUID_CP} io.druid.cli.Main druid server realtimeStandalone)

View File

@ -1,17 +1,25 @@
package druid.examples;
package druid.examples.guice;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.metamx.common.lifecycle.Lifecycle;
import com.google.inject.Binder;
import com.google.inject.TypeLiteral;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.InventoryView;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.guice.FireDepartmentsProvider;
import com.metamx.druid.guice.JsonConfigProvider;
import com.metamx.druid.guice.ManageLifecycle;
import com.metamx.druid.guice.NoopSegmentPublisherProvider;
import com.metamx.druid.guice.RealtimeManagerConfig;
import com.metamx.druid.initialization.DruidModule;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.realtime.RealtimeNode;
import com.metamx.druid.realtime.FireDepartment;
import com.metamx.druid.realtime.RealtimeManager;
import com.metamx.druid.realtime.SegmentPublisher;
import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory;
@ -20,64 +28,46 @@ import druid.examples.web.WebFirehoseFactory;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
/**
* Standalone Demo Realtime process.
*/
public class RealtimeStandaloneMain
public class RealtimeExampleModule implements DruidModule
{
private static final Logger log = new Logger(RealtimeStandaloneMain.class);
private static final Logger log = new Logger(RealtimeExampleModule.class);
public static void main(String[] args) throws Exception
@Override
public void configure(Binder binder)
{
LogLevelAdjuster.register();
binder.bind(SegmentPublisher.class).toProvider(NoopSegmentPublisherProvider.class);
binder.bind(DataSegmentPusher.class).to(NoopDataSegmentPusher.class);
binder.bind(DataSegmentAnnouncer.class).to(NoopDataSegmentAnnouncer.class);
binder.bind(InventoryView.class).to(NoopInventoryView.class);
binder.bind(ServerView.class).to(NoopServerView.class);
final Lifecycle lifecycle = new Lifecycle();
RealtimeNode rn = RealtimeNode.builder().build();
lifecycle.addManagedInstance(rn);
// register the Firehoses
rn.registerJacksonSubtype(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream")
JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class);
binder.bind(
new TypeLiteral<List<FireDepartment>>()
{
}
).toProvider(FireDepartmentsProvider.class);
binder.bind(RealtimeManager.class).in(ManageLifecycle.class);
}
@Override
public List<com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return Arrays.<com.fasterxml.jackson.databind.Module>asList(
new SimpleModule("RealtimeExampleModule")
.registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream")
)
);
// Create dummy objects for the various interfaces that interact with the DB, ZK and deep storage
rn.setSegmentPublisher(new NoopSegmentPublisher());
rn.setAnnouncer(new NoopDataSegmentAnnouncer());
rn.setDataSegmentPusher(new NoopDataSegmentPusher());
rn.setServerView(new NoopServerView());
rn.setInventoryView(new NoopInventoryView());
Runtime.getRuntime().addShutdownHook(
new Thread(
new Runnable()
{
@Override
public void run()
{
log.info("Running shutdown hook");
lifecycle.stop();
}
}
)
);
try {
lifecycle.start();
}
catch (Throwable t) {
log.info(t, "Throwable caught at startup, committing seppuku");
t.printStackTrace();
System.exit(2);
}
lifecycle.join();
}
private static class NoopServerView implements ServerView
@ -87,7 +77,7 @@ public class RealtimeStandaloneMain
Executor exec, ServerCallback callback
)
{
// do nothing
}
@Override
@ -95,7 +85,7 @@ public class RealtimeStandaloneMain
Executor exec, SegmentCallback callback
)
{
// do nothing
}
}
@ -123,15 +113,6 @@ public class RealtimeStandaloneMain
}
}
private static class NoopSegmentPublisher implements SegmentPublisher
{
@Override
public void publishSegment(DataSegment segment) throws IOException
{
// do nothing
}
}
private static class NoopDataSegmentAnnouncer implements DataSegmentAnnouncer
{
@Override
@ -158,4 +139,4 @@ public class RealtimeStandaloneMain
// do nothing
}
}
}
}

View File

@ -0,0 +1,53 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.db.DbTablesConfig;
import com.metamx.druid.realtime.DbSegmentPublisher;
import com.metamx.druid.realtime.DbSegmentPublisherConfig;
import com.metamx.druid.realtime.SegmentPublisher;
import org.skife.jdbi.v2.IDBI;
import javax.validation.constraints.NotNull;
/**
*/
public class DbSegmentPublisherProvider implements SegmentPublisherProvider
{
@JacksonInject
@NotNull
private IDBI idbi = null;
@JacksonInject
@NotNull
private DbTablesConfig config = null;
@JacksonInject
@NotNull
private ObjectMapper jsonMapper = null;
@Override
public SegmentPublisher get()
{
return new DbSegmentPublisher(jsonMapper, config, idbi);
}
}

View File

@ -0,0 +1,64 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.metamx.druid.realtime.FireDepartment;
import java.util.List;
/**
*/
public class FireDepartmentsProvider implements Provider<List<FireDepartment>>
{
private final List<FireDepartment> fireDepartments = Lists.newArrayList();
@Inject
public FireDepartmentsProvider(
ObjectMapper jsonMapper,
RealtimeManagerConfig config
)
{
try {
this.fireDepartments.addAll(
(List<FireDepartment>) jsonMapper.readValue(
config.getSpecFile(), new TypeReference<List<FireDepartment>>()
{
}
)
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
public List<FireDepartment> get()
{
return fireDepartments;
}
}

View File

@ -0,0 +1,34 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.metamx.druid.realtime.NoopSegmentPublisher;
import com.metamx.druid.realtime.SegmentPublisher;
/**
*/
public class NoopSegmentPublisherProvider implements SegmentPublisherProvider
{
@Override
public SegmentPublisher get()
{
return new NoopSegmentPublisher();
}
}

View File

@ -0,0 +1,37 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.File;
/**
*/
public class RealtimeManagerConfig
{
@JsonProperty
private File specFile;
public File getSpecFile()
{
return specFile;
}
}

View File

@ -0,0 +1,53 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.metamx.common.logger.Logger;
import com.metamx.druid.realtime.DbSegmentPublisherConfig;
import com.metamx.druid.realtime.FireDepartment;
import com.metamx.druid.realtime.RealtimeManager;
import com.metamx.druid.realtime.SegmentPublisher;
import java.util.List;
/**
*/
public class RealtimeModule implements Module
{
private static final Logger log = new Logger(RealtimeModule.class);
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.publish", SegmentPublisherProvider.class);
binder.bind(SegmentPublisher.class).toProvider(SegmentPublisherProvider.class);
JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class);
binder.bind(
new TypeLiteral<List<FireDepartment>>()
{
}
).toProvider(FireDepartmentsProvider.class);
binder.bind(RealtimeManager.class).in(ManageLifecycle.class);
}
}

View File

@ -0,0 +1,35 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.inject.Provider;
import com.metamx.druid.realtime.SegmentPublisher;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = NoopSegmentPublisherProvider.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "db", value = DbSegmentPublisherProvider.class)
})
public interface SegmentPublisherProvider extends Provider<SegmentPublisher>
{
}

View File

@ -3,6 +3,7 @@ package com.metamx.druid.realtime;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.db.DbTablesConfig;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
@ -17,12 +18,12 @@ public class DbSegmentPublisher implements SegmentPublisher
private static final Logger log = new Logger(DbSegmentPublisher.class);
private final ObjectMapper jsonMapper;
private final DbSegmentPublisherConfig config;
private final DbTablesConfig config;
private final IDBI dbi;
public DbSegmentPublisher(
ObjectMapper jsonMapper,
DbSegmentPublisherConfig config,
DbTablesConfig config,
IDBI dbi
)
{
@ -41,7 +42,7 @@ public class DbSegmentPublisher implements SegmentPublisher
public List<Map<String, Object>> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format("SELECT id FROM %s WHERE id=:id", config.getSegmentTable())
String.format("SELECT id FROM %s WHERE id=:id", config.getSegmentsTable())
)
.bind("id", segment.getIdentifier())
.list();
@ -65,13 +66,13 @@ public class DbSegmentPublisher implements SegmentPublisher
statement = String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentTable()
config.getSegmentsTable()
);
} else {
statement = String.format(
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentTable()
config.getSegmentsTable()
);
}

View File

@ -1,9 +1,28 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime;
import org.skife.config.Config;
public abstract class DbSegmentPublisherConfig
{
@Config("druid.database.segmentTable")
@Config("druid.db.tables.segmentTable")
public abstract String getSegmentTable();
}

View File

@ -21,6 +21,7 @@ package com.metamx.druid.realtime;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.inject.Inject;
import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.plumber.Plumber;

View File

@ -19,34 +19,17 @@
package com.metamx.druid.realtime;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.client.DataSegment;
import java.io.IOException;
/**
*/
public class RealtimeMain
public class NoopSegmentPublisher implements SegmentPublisher
{
private static final Logger log = new Logger(RealtimeMain.class);
public static void main(String[] args) throws Exception
@Override
public void publishSegment(DataSegment segment) throws IOException
{
LogLevelAdjuster.register();
Lifecycle lifecycle = new Lifecycle();
lifecycle.addManagedInstance(
RealtimeNode.builder().build()
);
try {
lifecycle.start();
}
catch (Throwable t) {
log.info(t, "Throwable caught at startup, committing seppuku");
System.exit(2);
}
lifecycle.join();
// do nothing
}
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.inject.Inject;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
@ -60,6 +61,7 @@ public class RealtimeManager implements QuerySegmentWalker
private final Map<String, FireChief> chiefs;
@Inject
public RealtimeManager(
List<FireDepartment> fireDepartments,
QueryRunnerFactoryConglomerate conglomerate

View File

@ -1,298 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.BaseServerNode;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.http.QueryServlet;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.Monitor;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
*/
public class RealtimeNode extends BaseServerNode<RealtimeNode>
{
private static final Logger log = new Logger(RealtimeNode.class);
public static Builder builder()
{
return new Builder();
}
private final Map<String, Object> injectablesMap = Maps.newLinkedHashMap();
private SegmentPublisher segmentPublisher = null;
private DataSegmentPusher dataSegmentPusher = null;
private List<FireDepartment> fireDepartments = null;
private boolean initialized = false;
public RealtimeNode(
Properties props,
Lifecycle lifecycle,
ObjectMapper jsonMapper,
ObjectMapper smileMapper,
ConfigurationObjectFactory configFactory
)
{
super("realtime", log, props, lifecycle, jsonMapper, smileMapper, configFactory);
}
public RealtimeNode setSegmentPublisher(SegmentPublisher segmentPublisher)
{
checkFieldNotSetAndSet("segmentPublisher", segmentPublisher);
return this;
}
public RealtimeNode setDataSegmentPusher(DataSegmentPusher dataSegmentPusher)
{
checkFieldNotSetAndSet("dataSegmentPusher", dataSegmentPusher);
return this;
}
public RealtimeNode setFireDepartments(List<FireDepartment> fireDepartments)
{
checkFieldNotSetAndSet("fireDepartments", fireDepartments);
return this;
}
public RealtimeNode registerJacksonInjectable(String name, Object object)
{
Preconditions.checkState(injectablesMap.containsKey(name), "Already registered jackson object[%s]", name);
injectablesMap.put(name, object);
return this;
}
public SegmentPublisher getSegmentPublisher()
{
initializeSegmentPublisher();
return segmentPublisher;
}
public DataSegmentPusher getDataSegmentPusher()
{
initializeSegmentPusher();
return dataSegmentPusher;
}
public List<FireDepartment> getFireDepartments()
{
initializeFireDepartments();
return fireDepartments;
}
protected void doInit() throws Exception
{
initializeJacksonInjectables();
final Lifecycle lifecycle = getLifecycle();
final ServiceEmitter emitter = getEmitter();
final QueryRunnerFactoryConglomerate conglomerate = getConglomerate();
final List<Monitor> monitors = getMonitors();
final List<FireDepartment> departments = getFireDepartments();
monitors.add(new RealtimeMetricsMonitor(departments));
final RealtimeManager realtimeManager = new RealtimeManager(departments, conglomerate);
lifecycle.addManagedInstance(realtimeManager);
startMonitoring(monitors);
final ServletContextHandler root = new ServletContextHandler(getServer(), "/", ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(
new ServletHolder(
new QueryServlet(getJsonMapper(), getSmileMapper(), realtimeManager, emitter, getRequestLogger())
),
"/druid/v2/*"
);
initialized = true;
}
@LifecycleStart
public synchronized void start() throws Exception
{
if (! initialized) {
init();
}
getLifecycle().start();
}
@LifecycleStop
public synchronized void stop()
{
getLifecycle().stop();
}
protected void initializeJacksonInjectables()
{
final Map<String, Object> injectables = Maps.newHashMap();
for (Map.Entry<String, Object> entry : injectablesMap.entrySet()) {
injectables.put(entry.getKey(), entry.getValue());
}
injectables.put("queryRunnerFactoryConglomerate", getConglomerate());
injectables.put("segmentPusher", getDataSegmentPusher());
injectables.put("segmentAnnouncer", getAnnouncer());
injectables.put("segmentPublisher", getSegmentPublisher());
injectables.put("serverView", getServerView());
injectables.put("serviceEmitter", getEmitter());
getJsonMapper().setInjectableValues(
new InjectableValues()
{
@Override
public Object findInjectableValue(
Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
)
{
return injectables.get(valueId);
}
}
);
}
private void initializeFireDepartments()
{
if (fireDepartments == null) {
try {
setFireDepartments(
getJsonMapper().<List<FireDepartment>>readValue(
new File(PropUtils.getProperty(getProps(), "druid.realtime.specFile")),
new TypeReference<List<FireDepartment>>(){}
)
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
}
private void initializeSegmentPusher()
{
if (dataSegmentPusher == null) {
dataSegmentPusher = ServerInit.getSegmentPusher(getProps(), getConfigFactory(), getJsonMapper());
}
}
protected void initializeSegmentPublisher()
{
if (segmentPublisher == null) {
final DbSegmentPublisherConfig dbSegmentPublisherConfig = getConfigFactory().build(DbSegmentPublisherConfig.class);
segmentPublisher = new DbSegmentPublisher(
getJsonMapper(),
dbSegmentPublisherConfig,
new DbConnector(Suppliers.ofInstance(getConfigFactory().build(DbConnectorConfig.class)), null).getDBI() // TODO
);
getLifecycle().addManagedInstance(segmentPublisher);
}
}
public static class Builder
{
private ObjectMapper jsonMapper = null;
private ObjectMapper smileMapper = null;
private Lifecycle lifecycle = null;
private Properties props = null;
private ConfigurationObjectFactory configFactory = null;
public Builder withMappers(ObjectMapper jsonMapper, ObjectMapper smileMapper)
{
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
return this;
}
public Builder withProps(Properties props)
{
this.props = props;
return this;
}
public Builder withConfigFactory(ConfigurationObjectFactory configFactory)
{
this.configFactory = configFactory;
return this;
}
public RealtimeNode build()
{
if (jsonMapper == null && smileMapper == null) {
jsonMapper = new DefaultObjectMapper();
smileMapper = new DefaultObjectMapper(new SmileFactory());
smileMapper.getJsonFactory().setCodec(smileMapper);
}
else if (jsonMapper == null || smileMapper == null) {
throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper);
}
if (lifecycle == null) {
lifecycle = new Lifecycle();
}
if (props == null) {
props = Initialization.loadProperties();
}
if (configFactory == null) {
configFactory = Config.createFactory(props);
}
return new RealtimeNode(props, lifecycle, jsonMapper, smileMapper, configFactory);
}
}
}

View File

@ -26,7 +26,7 @@ import java.io.Closeable;
/**
* This is an interface that holds onto the stream of incoming data. Realtime data ingestion is built around this
* abstraction. In order to add a new type of source for realtime data ingestion, all you need to do is implement
* one of these and register it with the RealtimeMain.
* one of these and register it with the Main.
*
* This object acts a lot like an Iterator, but it doesn't extend the Iterator interface because it extends
* Closeable and it is very important that the close() method doesn't get forgotten, which is easy to do if this

View File

@ -68,6 +68,7 @@ import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.sun.istack.internal.NotNull;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Duration;
@ -98,14 +99,31 @@ public class RealtimePlumberSchool implements PlumberSchool
private final IndexGranularity segmentGranularity;
private final Object handoffCondition = new Object();
@JacksonInject
@NotNull
private ServiceEmitter emitter;
private volatile VersioningPolicy versioningPolicy = null;
private volatile RejectionPolicyFactory rejectionPolicyFactory = null;
@JacksonInject
@NotNull
private volatile QueryRunnerFactoryConglomerate conglomerate = null;
@JacksonInject
@NotNull
private volatile DataSegmentPusher dataSegmentPusher = null;
@JacksonInject
@NotNull
private volatile DataSegmentAnnouncer segmentAnnouncer = null;
@JacksonInject
@NotNull
private volatile SegmentPublisher segmentPublisher = null;
@JacksonInject
@NotNull
private volatile ServerView serverView = null;
@JsonCreator
@ -138,42 +156,6 @@ public class RealtimePlumberSchool implements PlumberSchool
this.rejectionPolicyFactory = factory;
}
@JacksonInject("queryRunnerFactoryConglomerate")
public void setConglomerate(QueryRunnerFactoryConglomerate conglomerate)
{
this.conglomerate = conglomerate;
}
@JacksonInject("segmentPusher")
public void setDataSegmentPusher(DataSegmentPusher dataSegmentPusher)
{
this.dataSegmentPusher = dataSegmentPusher;
}
@JacksonInject("segmentAnnouncer")
public void setSegmentAnnouncer(DataSegmentAnnouncer segmentAnnouncer)
{
this.segmentAnnouncer = segmentAnnouncer;
}
@JacksonInject("segmentPublisher")
public void setSegmentPublisher(SegmentPublisher segmentPublisher)
{
this.segmentPublisher = segmentPublisher;
}
@JacksonInject("serverView")
public void setServerView(ServerView serverView)
{
this.serverView = serverView;
}
@JacksonInject("serviceEmitter")
public void setServiceEmitter(ServiceEmitter emitter)
{
this.emitter = emitter;
}
@Override
public Plumber findPlumber(final Schema schema, final FireDepartmentMetrics metrics)
{

View File

@ -0,0 +1,47 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.cassandra.CassandraDataSegmentConfig;
import com.metamx.druid.loading.cassandra.CassandraDataSegmentPusher;
import javax.validation.constraints.NotNull;
/**
*/
public class CassandraDataSegmentPusherProvider implements DataSegmentPusherProvider
{
@JacksonInject
@NotNull
private CassandraDataSegmentConfig config = null;
@JacksonInject
@NotNull
private ObjectMapper jsonMapper = null;
@Override
public DataSegmentPusher get()
{
return new CassandraDataSegmentPusher(config, jsonMapper);
}
}

View File

@ -0,0 +1,83 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder;
import com.metamx.druid.loading.DataSegmentPuller;
import com.metamx.druid.loading.HdfsDataSegmentPuller;
import com.metamx.druid.loading.LocalDataSegmentPuller;
import com.metamx.druid.loading.OmniSegmentLoader;
import com.metamx.druid.loading.S3DataSegmentPuller;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.loading.cassandra.CassandraDataSegmentConfig;
import com.metamx.druid.loading.cassandra.CassandraDataSegmentPuller;
import org.apache.hadoop.conf.Configuration;
/**
*/
public class DataSegmentPullerModule implements Module
{
@Override
public void configure(Binder binder)
{
binder.bind(SegmentLoader.class).to(OmniSegmentLoader.class).in(LazySingleton.class);
bindDeepStorageLocal(binder);
bindDeepStorageS3(binder);
bindDeepStorageHdfs(binder);
bindDeepStorageCassandra(binder);
}
private static void bindDeepStorageLocal(Binder binder)
{
final MapBinder<String, DataSegmentPuller> segmentPullerBinder = MapBinder.newMapBinder(
binder, String.class, DataSegmentPuller.class
);
segmentPullerBinder.addBinding("local").to(LocalDataSegmentPuller.class).in(LazySingleton.class);
}
private static void bindDeepStorageS3(Binder binder)
{
final MapBinder<String, DataSegmentPuller> segmentPullerBinder = MapBinder.newMapBinder(
binder, String.class, DataSegmentPuller.class
);
segmentPullerBinder.addBinding("s3_zip").to(S3DataSegmentPuller.class).in(LazySingleton.class);
}
private static void bindDeepStorageHdfs(Binder binder)
{
final MapBinder<String, DataSegmentPuller> segmentPullerBinder = MapBinder.newMapBinder(
binder, String.class, DataSegmentPuller.class
);
segmentPullerBinder.addBinding("hdfs").to(HdfsDataSegmentPuller.class).in(LazySingleton.class);
binder.bind(Configuration.class).toInstance(new Configuration());
}
private static void bindDeepStorageCassandra(Binder binder)
{
final MapBinder<String, DataSegmentPuller> segmentPullerBinder = MapBinder.newMapBinder(
binder, String.class, DataSegmentPuller.class
);
segmentPullerBinder.addBinding("c*").to(CassandraDataSegmentPuller.class).in(LazySingleton.class);
ConfigProvider.bind(binder, CassandraDataSegmentConfig.class);
}
}

View File

@ -0,0 +1,30 @@
package com.metamx.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.HdfsDataSegmentPusherConfig;
import com.metamx.druid.loading.LocalDataSegmentPusherConfig;
import com.metamx.druid.loading.S3DataSegmentPusherConfig;
import com.metamx.druid.loading.cassandra.CassandraDataSegmentConfig;
import org.apache.hadoop.conf.Configuration;
/**
*/
public class DataSegmentPusherModule implements Module
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.pusher", DataSegmentPusherProvider.class);
JsonConfigProvider.bind(binder, "druid.pusher.s3", S3DataSegmentPusherConfig.class);
binder.bind(Configuration.class).toInstance(new Configuration());
JsonConfigProvider.bind(binder, "druid.pusher.hdfs", HdfsDataSegmentPusherConfig.class);
JsonConfigProvider.bind(binder, "druid.pusher.cassandra", CassandraDataSegmentConfig.class);
binder.bind(DataSegmentPusher.class).toProvider(DataSegmentPusherProvider.class);
}
}

View File

@ -0,0 +1,37 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.inject.Provider;
import com.metamx.druid.loading.DataSegmentPusher;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = LocalDataSegmentPusherProvider.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "s3_zip", value = S3DataSegmentPusherProvider.class),
@JsonSubTypes.Type(name = "hdfs", value = HdfsDataSegmentPusherProvider.class),
@JsonSubTypes.Type(name = "c*", value = CassandraDataSegmentPusherProvider.class)
})
public interface DataSegmentPusherProvider extends Provider<DataSegmentPusher>
{
}

View File

@ -0,0 +1,136 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.ProvisionException;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.logger.Logger;
import com.metamx.druid.DruidProcessingConfig;
import com.metamx.druid.collect.StupidPool;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.guice.annotations.Global;
import com.metamx.druid.guice.annotations.Processing;
import com.metamx.druid.query.MetricsEmittingExecutorService;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
public class DruidProcessingModule implements Module
{
private static final Logger log = new Logger(DruidProcessingModule.class);
@Override
public void configure(Binder binder)
{
ConfigProvider.bind(binder, DruidProcessingConfig.class, ImmutableMap.of("base_path", "druid.processing"));
binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class);
}
@Provides
@Processing
@ManageLifecycle
public ExecutorService getProcessingExecutorService(ExecutorServiceConfig config, ServiceEmitter emitter)
{
return new MetricsEmittingExecutorService(
Executors.newFixedThreadPool(config.getNumThreads(), Execs.makeThreadFactory(config.getFormatString())),
emitter,
new ServiceMetricEvent.Builder()
);
}
@Provides
@LazySingleton
@Global
public StupidPool<ByteBuffer> getIntermediateResultsPool(DruidProcessingConfig config)
{
try {
Class<?> vmClass = Class.forName("sun.misc.VM");
Object maxDirectMemoryObj = vmClass.getMethod("maxDirectMemory").invoke(null);
if (maxDirectMemoryObj == null || !(maxDirectMemoryObj instanceof Number)) {
log.info("Cannot determine maxDirectMemory from[%s]", maxDirectMemoryObj);
} else {
long maxDirectMemory = ((Number) maxDirectMemoryObj).longValue();
final long memoryNeeded = (long) config.intermediateComputeSizeBytes() * (config.getNumThreads() + 1);
if (maxDirectMemory < memoryNeeded) {
throw new ProvisionException(
String.format(
"Not enough direct memory. Please adjust -XX:MaxDirectMemorySize or druid.computation.buffer.size: "
+ "maxDirectMemory[%,d], memoryNeeded[%,d], druid.computation.buffer.size[%,d], numThreads[%,d]",
maxDirectMemory, memoryNeeded, config.intermediateComputeSizeBytes(), config.getNumThreads()
)
);
}
}
}
catch (ClassNotFoundException e) {
log.info("No VM class, cannot do memory check.");
}
catch (NoSuchMethodException e) {
log.info("VM.maxDirectMemory doesn't exist, cannot do memory check.");
}
catch (InvocationTargetException e) {
log.warn(e, "static method shouldn't throw this");
}
catch (IllegalAccessException e) {
log.warn(e, "public method, shouldn't throw this");
}
return new IntermediateProcessingBufferPool(config.intermediateComputeSizeBytes());
}
private static class IntermediateProcessingBufferPool extends StupidPool<ByteBuffer>
{
private static final Logger log = new Logger(IntermediateProcessingBufferPool.class);
public IntermediateProcessingBufferPool(final int computationBufferSize)
{
super(
new Supplier<ByteBuffer>()
{
final AtomicLong count = new AtomicLong(0);
@Override
public ByteBuffer get()
{
log.info(
"Allocating new intermediate processing buffer[%,d] of size[%,d]",
count.getAndIncrement(), computationBufferSize
);
return ByteBuffer.allocateDirect(computationBufferSize);
}
}
);
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.HdfsDataSegmentPusher;
import com.metamx.druid.loading.HdfsDataSegmentPusherConfig;
import org.apache.hadoop.conf.Configuration;
import javax.validation.constraints.NotNull;
/**
*/
public class HdfsDataSegmentPusherProvider implements DataSegmentPusherProvider
{
@JacksonInject
@NotNull
private HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConfig = null;
@JacksonInject
@NotNull
private Configuration config = null;
@JacksonInject
@NotNull
private ObjectMapper jsonMapper = null;
@Override
public DataSegmentPusher get()
{
return new HdfsDataSegmentPusher(hdfsDataSegmentPusherConfig, config, jsonMapper);
}
}

View File

@ -1,59 +1,29 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.ProvisionException;
import com.google.inject.multibindings.MapBinder;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.logger.Logger;
import com.metamx.druid.DruidProcessingConfig;
import com.metamx.druid.client.DruidServerConfig;
import com.metamx.druid.collect.StupidPool;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.coordination.BatchDataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncerProvider;
import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.coordination.ServerManager;
import com.metamx.druid.coordination.SingleDataSegmentAnnouncer;
import com.metamx.druid.coordination.ZkCoordinator;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.guice.annotations.Global;
import com.metamx.druid.guice.annotations.Processing;
import com.metamx.druid.guice.annotations.Self;
import com.metamx.druid.initialization.BatchDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.DruidNode;
import com.metamx.druid.loading.DataSegmentPuller;
import com.metamx.druid.loading.HdfsDataSegmentPuller;
import com.metamx.druid.loading.LocalDataSegmentPuller;
import com.metamx.druid.loading.MMappedQueryableIndexFactory;
import com.metamx.druid.loading.OmniSegmentLoader;
import com.metamx.druid.loading.QueryableIndexFactory;
import com.metamx.druid.loading.S3CredentialsConfig;
import com.metamx.druid.loading.S3DataSegmentPuller;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.loading.cassandra.CassandraDataSegmentConfig;
import com.metamx.druid.loading.cassandra.CassandraDataSegmentPuller;
import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate;
import com.metamx.druid.query.MetricsEmittingExecutorService;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.apache.curator.framework.CuratorFramework;
import org.apache.hadoop.conf.Configuration;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
@ -64,161 +34,7 @@ public class HistoricalModule implements Module
@Override
public void configure(Binder binder)
{
ConfigProvider.bind(binder, DruidProcessingConfig.class, ImmutableMap.of("base_path", "druid.processing"));
binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class);
JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class);
JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class);
binder.bind(ServerManager.class).in(LazySingleton.class);
binder.bind(SegmentLoader.class).to(OmniSegmentLoader.class).in(LazySingleton.class);
binder.bind(QueryableIndexFactory.class).to(MMappedQueryableIndexFactory.class).in(LazySingleton.class);
final MapBinder<String, DataSegmentPuller> segmentPullerBinder = MapBinder.newMapBinder(
binder, String.class, DataSegmentPuller.class
);
segmentPullerBinder.addBinding("local").to(LocalDataSegmentPuller.class).in(LazySingleton.class);
bindDeepStorageS3(binder);
bindDeepStorageHdfs(binder);
bindDeepStorageCassandra(binder);
binder.bind(QueryRunnerFactoryConglomerate.class)
.to(DefaultQueryRunnerFactoryConglomerate.class)
.in(LazySingleton.class);
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
JsonConfigProvider.bind(binder, "druid.announcer", BatchDataSegmentAnnouncerConfig.class);
JsonConfigProvider.bind(binder, "druid.announcer", DataSegmentAnnouncerProvider.class);
binder.bind(DataSegmentAnnouncer.class).toProvider(DataSegmentAnnouncerProvider.class);
binder.bind(BatchDataSegmentAnnouncer.class).in(ManageLifecycleLast.class);
binder.bind(SingleDataSegmentAnnouncer.class).in(ManageLifecycleLast.class);
}
@Provides @LazySingleton
public DruidServerMetadata getMetadata(@Self DruidNode node, DruidServerConfig config)
{
return new DruidServerMetadata(node.getHost(), node.getHost(), config.getMaxSize(), "historical", config.getTier());
}
@Provides @ManageLifecycle
public Announcer getAnnouncer(CuratorFramework curator)
{
return new Announcer(curator, Execs.singleThreaded("Announcer-%s"));
}
@Provides @Processing @ManageLifecycle
public ExecutorService getProcessingExecutorService(ExecutorServiceConfig config, ServiceEmitter emitter)
{
return new MetricsEmittingExecutorService(
Executors.newFixedThreadPool(config.getNumThreads(), Execs.makeThreadFactory(config.getFormatString())),
emitter,
new ServiceMetricEvent.Builder()
);
}
@Provides @LazySingleton
public RestS3Service getRestS3Service(S3CredentialsConfig config)
{
try {
return new RestS3Service(new AWSCredentials(config.getAccessKey(), config.getSecretKey()));
}
catch (S3ServiceException e) {
throw new ProvisionException("Unable to create a RestS3Service", e);
}
}
@Provides @LazySingleton @Global
public StupidPool<ByteBuffer> getIntermediateResultsPool(DruidProcessingConfig config)
{
try {
Class<?> vmClass = Class.forName("sun.misc.VM");
Object maxDirectMemoryObj = vmClass.getMethod("maxDirectMemory").invoke(null);
if (maxDirectMemoryObj == null || !(maxDirectMemoryObj instanceof Number)) {
log.info("Cannot determine maxDirectMemory from[%s]", maxDirectMemoryObj);
} else {
long maxDirectMemory = ((Number) maxDirectMemoryObj).longValue();
final long memoryNeeded = (long) config.intermediateComputeSizeBytes() * (config.getNumThreads() + 1);
if (maxDirectMemory < memoryNeeded) {
throw new ProvisionException(
String.format(
"Not enough direct memory. Please adjust -XX:MaxDirectMemorySize or druid.computation.buffer.size: "
+ "maxDirectMemory[%,d], memoryNeeded[%,d], druid.computation.buffer.size[%,d], numThreads[%,d]",
maxDirectMemory, memoryNeeded, config.intermediateComputeSizeBytes(), config.getNumThreads()
)
);
}
}
}
catch (ClassNotFoundException e) {
log.info("No VM class, cannot do memory check.");
}
catch (NoSuchMethodException e) {
log.info("VM.maxDirectMemory doesn't exist, cannot do memory check.");
}
catch (InvocationTargetException e) {
log.warn(e, "static method shouldn't throw this");
}
catch (IllegalAccessException e) {
log.warn(e, "public method, shouldn't throw this");
}
return new IntermediateProcessingBufferPool(config.intermediateComputeSizeBytes());
}
private static void bindDeepStorageS3(Binder binder)
{
final MapBinder<String, DataSegmentPuller> segmentPullerBinder = MapBinder.newMapBinder(
binder, String.class, DataSegmentPuller.class
);
segmentPullerBinder.addBinding("s3_zip").to(S3DataSegmentPuller.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.s3", S3CredentialsConfig.class);
}
private static void bindDeepStorageHdfs(Binder binder)
{
final MapBinder<String, DataSegmentPuller> segmentPullerBinder = MapBinder.newMapBinder(
binder, String.class, DataSegmentPuller.class
);
segmentPullerBinder.addBinding("hdfs").to(HdfsDataSegmentPuller.class).in(LazySingleton.class);
binder.bind(Configuration.class).toInstance(new Configuration());
}
private static void bindDeepStorageCassandra(Binder binder)
{
final MapBinder<String, DataSegmentPuller> segmentPullerBinder = MapBinder.newMapBinder(
binder, String.class, DataSegmentPuller.class
);
segmentPullerBinder.addBinding("c*").to(CassandraDataSegmentPuller.class).in(LazySingleton.class);
ConfigProvider.bind(binder, CassandraDataSegmentConfig.class);
}
private static class IntermediateProcessingBufferPool extends StupidPool<ByteBuffer>
{
private static final Logger log = new Logger(IntermediateProcessingBufferPool.class);
public IntermediateProcessingBufferPool(final int computationBufferSize)
{
super(
new Supplier<ByteBuffer>()
{
final AtomicLong count = new AtomicLong(0);
@Override
public ByteBuffer get()
{
log.info(
"Allocating new intermediate processing buffer[%,d] of size[%,d]",
count.getAndIncrement(), computationBufferSize
);
return ByteBuffer.allocateDirect(computationBufferSize);
}
}
);
}
}
}

View File

@ -0,0 +1,43 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.LocalDataSegmentPusher;
import com.metamx.druid.loading.LocalDataSegmentPusherConfig;
import javax.validation.constraints.NotNull;
/**
*/
public class LocalDataSegmentPusherProvider extends LocalDataSegmentPusherConfig implements DataSegmentPusherProvider
{
@JacksonInject
@NotNull
private ObjectMapper jsonMapper = null;
@Override
public DataSegmentPusher get()
{
return new LocalDataSegmentPusher(this, jsonMapper);
}
}

View File

@ -0,0 +1,52 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentPusherConfig;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import javax.validation.constraints.NotNull;
/**
*/
public class S3DataSegmentPusherProvider implements DataSegmentPusherProvider
{
@JacksonInject
@NotNull
private RestS3Service restS3Service = null;
@JacksonInject
@NotNull
private S3DataSegmentPusherConfig config = null;
@JacksonInject
@NotNull
private ObjectMapper jsonMapper = null;
@Override
public DataSegmentPusher get()
{
return new S3DataSegmentPusher(restS3Service, config, jsonMapper);
}
}

View File

@ -0,0 +1,52 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.ProvisionException;
import com.metamx.druid.loading.S3CredentialsConfig;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
/**
*/
public class S3Module implements Module
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.s3", S3CredentialsConfig.class);
}
@Provides
@LazySingleton
public RestS3Service getRestS3Service(S3CredentialsConfig config)
{
try {
return new RestS3Service(new AWSCredentials(config.getAccessKey(), config.getSecretKey()));
}
catch (S3ServiceException e) {
throw new ProvisionException("Unable to create a RestS3Service", e);
}
}
}

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.google.inject.Binder;

View File

@ -0,0 +1,72 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Provides;
import com.metamx.druid.client.DruidServerConfig;
import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.guice.annotations.Self;
import com.metamx.druid.initialization.DruidNode;
import com.metamx.druid.loading.MMappedQueryableIndexFactory;
import com.metamx.druid.loading.QueryableIndexFactory;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
/**
*/
public class StorageNodeModule extends ServerModule
{
private final String nodeType;
public StorageNodeModule(String nodeType)
{
this.nodeType = nodeType;
}
@Override
public void configure(Binder binder)
{
super.configure(binder);
JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class);
JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class);
binder.bind(QueryableIndexFactory.class).to(MMappedQueryableIndexFactory.class).in(LazySingleton.class);
binder.bind(QueryRunnerFactoryConglomerate.class)
.to(DefaultQueryRunnerFactoryConglomerate.class)
.in(LazySingleton.class);
}
@Provides
@LazySingleton
public DruidServerMetadata getMetadata(@Self DruidNode node, DruidServerConfig config)
{
return new DruidServerMetadata(
node.getHost(),
node.getHost(),
config.getMaxSize(),
nodeType,
config.getTier()
);
}
}

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -5,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.OutputSupplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
@ -28,6 +48,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
private final Configuration hadoopConfig;
private final ObjectMapper jsonMapper;
@Inject
public HdfsDataSegmentPusher(
HdfsDataSegmentPusherConfig config,
Configuration hadoopConfig,

View File

@ -19,12 +19,17 @@
package com.metamx.druid.loading;
import org.skife.config.Config;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
public abstract class HdfsDataSegmentPusherConfig
{
@Config("druid.pusher.hdfs.storageDirectory")
public abstract String getStorageDirectory();
@JsonProperty
public String storageDirectory = "";
public String getStorageDirectory()
{
return storageDirectory;
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
@ -40,6 +41,7 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
private final LocalDataSegmentPusherConfig config;
private final ObjectMapper jsonMapper;
@Inject
public LocalDataSegmentPusher(
LocalDataSegmentPusherConfig config,
ObjectMapper jsonMapper

View File

@ -19,14 +19,20 @@
package com.metamx.druid.loading;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.skife.config.Config;
import java.io.File;
/**
*/
public abstract class LocalDataSegmentPusherConfig
public class LocalDataSegmentPusherConfig
{
@Config("druid.pusher.local.storageDirectory")
public abstract File getStorageDirectory();
@JsonProperty
public File storageDirectory;
public File getStorageDirectory()
{
return storageDirectory;
}
}

View File

@ -24,6 +24,7 @@ import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.inject.Inject;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.utils.CompressionUtils;
@ -46,6 +47,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher
private final S3DataSegmentPusherConfig config;
private final ObjectMapper jsonMapper;
@Inject
public S3DataSegmentPusher(
RestS3Service s3Client,
S3DataSegmentPusherConfig config,

View File

@ -19,21 +19,35 @@
package com.metamx.druid.loading;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/
public abstract class S3DataSegmentPusherConfig
public class S3DataSegmentPusherConfig
{
@Config("druid.pusher.s3.bucket")
public abstract String getBucket();
@JsonProperty
public String bucket = "";
@Config("druid.pusher.s3.baseKey")
@Default("")
public abstract String getBaseKey();
@JsonProperty
public String baseKey = "";
@Config("druid.pusher.s3.disableAcl")
@Default("false")
public abstract boolean getDisableAcl();
@JsonProperty
public boolean disableAcl = false;
public String getBucket()
{
return bucket;
}
public String getBaseKey()
{
return baseKey;
}
public boolean getDisableAcl()
{
return disableAcl;
}
}

View File

@ -19,8 +19,7 @@
package com.metamx.druid.loading.cassandra;
import org.skife.config.Config;
import org.skife.config.Default;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* Cassandra Config
@ -29,11 +28,19 @@ import org.skife.config.Default;
*/
public abstract class CassandraDataSegmentConfig
{
@Config("druid.pusher.cassandra.host")
@Default("")
public abstract String getHost();
@JsonProperty
public String host = "";
@Config("druid.pusher.cassandra.keyspace")
@Default("")
public abstract String getKeyspace();
@JsonProperty
public String keyspace = "";
public String getKeyspace()
{
return keyspace;
}
public String getHost()
{
return host;
}
}

View File

@ -1,12 +1,28 @@
package com.metamx.druid.loading.cassandra;
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
package com.metamx.druid.loading.cassandra;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
@ -16,6 +32,10 @@ import com.metamx.druid.utils.CompressionUtils;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.recipes.storage.ChunkedStorage;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
/**
* Cassandra Segment Pusher
*
@ -28,6 +48,7 @@ public class CassandraDataSegmentPusher extends CassandraStorage implements Data
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private final ObjectMapper jsonMapper;
@Inject
public CassandraDataSegmentPusher(
CassandraDataSegmentConfig config,
ObjectMapper jsonMapper)

View File

@ -1,10 +1,27 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.metrics;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.metamx.metrics.JvmMonitor;
import com.google.common.collect.Lists;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.SysMonitor;
import javax.validation.constraints.NotNull;
import java.util.List;
@ -13,12 +30,9 @@ import java.util.List;
*/
public class MonitorsConfig
{
@JsonProperty("monitorExclusions")
@JsonProperty("monitors")
@NotNull
private List<Class<? extends Monitor>> monitors = ImmutableList.<Class<? extends Monitor>>builder()
.add(JvmMonitor.class)
.add(SysMonitor.class)
.build();
private List<Class<? extends Monitor>> monitors = Lists.newArrayList();
public List<Class<? extends Monitor>> getMonitors()
{

View File

@ -42,6 +42,11 @@
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-examples</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>airline</artifactId>

View File

@ -1,7 +1,27 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.cli;
import com.google.inject.Injector;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.cache.CacheMonitor;
import com.metamx.druid.curator.CuratorModule;
import com.metamx.druid.guice.BrokerModule;
import com.metamx.druid.guice.HttpClientModule;
@ -42,7 +62,7 @@ public class CliBroker extends ServerRunnable
EmitterModule.class,
HttpClientModule.global(),
CuratorModule.class,
new MetricsModule(),
new MetricsModule().register(CacheMonitor.class),
ServerModule.class,
new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class),

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.cli;
import com.google.inject.Injector;
@ -5,12 +24,18 @@ import com.metamx.common.logger.Logger;
import com.metamx.druid.coordination.ServerManager;
import com.metamx.druid.coordination.ZkCoordinator;
import com.metamx.druid.curator.CuratorModule;
import com.metamx.druid.guice.AnnouncerModule;
import com.metamx.druid.guice.DataSegmentPullerModule;
import com.metamx.druid.guice.DataSegmentPusherModule;
import com.metamx.druid.guice.DruidProcessingModule;
import com.metamx.druid.guice.HistoricalModule;
import com.metamx.druid.guice.HttpClientModule;
import com.metamx.druid.guice.LifecycleModule;
import com.metamx.druid.guice.QueryRunnerFactoryModule;
import com.metamx.druid.guice.QueryableModule;
import com.metamx.druid.guice.S3Module;
import com.metamx.druid.guice.ServerModule;
import com.metamx.druid.guice.StorageNodeModule;
import com.metamx.druid.http.StatusResource;
import com.metamx.druid.initialization.EmitterModule;
import com.metamx.druid.initialization.Initialization;
@ -42,8 +67,12 @@ public class CliHistorical extends ServerRunnable
EmitterModule.class,
HttpClientModule.global(),
CuratorModule.class,
AnnouncerModule.class,
DruidProcessingModule.class,
S3Module.class,
DataSegmentPullerModule.class,
new MetricsModule().register(ServerMonitor.class),
ServerModule.class,
new StorageNodeModule("historical"),
new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class),
new QueryableModule(ServerManager.class),

View File

@ -0,0 +1,86 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.cli;
import com.google.inject.Injector;
import com.metamx.common.logger.Logger;
import com.metamx.druid.DruidProcessingConfig;
import com.metamx.druid.curator.CuratorModule;
import com.metamx.druid.guice.AnnouncerModule;
import com.metamx.druid.guice.DataSegmentPusherModule;
import com.metamx.druid.guice.DbConnectorModule;
import com.metamx.druid.guice.DruidProcessingModule;
import com.metamx.druid.guice.HttpClientModule;
import com.metamx.druid.guice.LifecycleModule;
import com.metamx.druid.guice.QueryRunnerFactoryModule;
import com.metamx.druid.guice.QueryableModule;
import com.metamx.druid.guice.RealtimeModule;
import com.metamx.druid.guice.S3Module;
import com.metamx.druid.guice.ServerModule;
import com.metamx.druid.guice.ServerViewModule;
import com.metamx.druid.guice.StorageNodeModule;
import com.metamx.druid.http.StatusResource;
import com.metamx.druid.initialization.EmitterModule;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.JettyServerModule;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.metrics.MetricsModule;
import com.metamx.druid.realtime.RealtimeManager;
import io.airlift.command.Command;
/**
*/
@Command(
name = "realtime",
description = "Runs a realtime node, see https://github.com/metamx/druid/wiki/Realtime for a description"
)
public class CliRealtime extends ServerRunnable
{
private static final Logger log = new Logger(CliBroker.class);
public CliRealtime()
{
super(log);
}
@Override
protected Injector getInjector()
{
return Initialization.makeInjector(
new LifecycleModule(),
EmitterModule.class,
DbConnectorModule.class,
HttpClientModule.global(),
CuratorModule.class,
AnnouncerModule.class,
DruidProcessingModule.class,
S3Module.class,
DataSegmentPusherModule.class,
new MetricsModule(),
new StorageNodeModule("realtime"),
new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class),
new ServerViewModule(),
new QueryableModule(RealtimeManager.class),
new QueryRunnerFactoryModule(),
RealtimeModule.class
);
}
}

View File

@ -0,0 +1,67 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.cli;
import com.google.inject.Injector;
import com.metamx.common.logger.Logger;
import com.metamx.druid.guice.DruidProcessingModule;
import com.metamx.druid.guice.LifecycleModule;
import com.metamx.druid.guice.QueryRunnerFactoryModule;
import com.metamx.druid.guice.QueryableModule;
import com.metamx.druid.guice.StorageNodeModule;
import com.metamx.druid.http.StatusResource;
import com.metamx.druid.initialization.EmitterModule;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.JettyServerModule;
import com.metamx.druid.realtime.RealtimeManager;
import druid.examples.guice.RealtimeExampleModule;
import io.airlift.command.Command;
/**
*/
@Command(
name = "example realtime",
description = "Runs a standalone realtime node for examples, see https://github.com/metamx/druid/wiki/Realtime for a description"
)
public class CliRealtimeExample extends ServerRunnable
{
private static final Logger log = new Logger(CliBroker.class);
public CliRealtimeExample()
{
super(log);
}
@Override
protected Injector getInjector()
{
return Initialization.makeInjector(
new LifecycleModule(),
EmitterModule.class,
DruidProcessingModule.class,
new StorageNodeModule("realtime"),
new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class),
new QueryableModule(RealtimeManager.class),
new QueryRunnerFactoryModule(),
RealtimeExampleModule.class
);
}
}

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.cli;
import io.airlift.command.Cli;
@ -19,7 +38,7 @@ public class Main
builder.withGroup("server")
.withDescription("Run one of the Druid server types.")
.withDefaultCommand(Help.class)
.withCommands(CliCoordinator.class, CliHistorical.class, CliBroker.class);
.withCommands(CliCoordinator.class, CliHistorical.class, CliBroker.class, CliRealtime.class, CliRealtimeExample.class);
builder.build().parse(args).run();
}