1) Make the demos work again.

2) First step in restructuring the demos into a single repository
This commit is contained in:
cheddar 2013-04-30 18:13:55 -05:00
parent e1367f256b
commit 8922adb1f2
19 changed files with 207 additions and 89 deletions

View File

@ -34,8 +34,10 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DruidServerConfig;
import com.metamx.druid.client.ServerInventoryThingieConfig;
import com.metamx.druid.client.InventoryView;
import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
@ -95,6 +97,8 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
private ScheduledExecutorFactory scheduledExecutorFactory = null;
private RequestLogger requestLogger = null;
private ServerInventoryView serverInventoryView = null;
private ServerView serverView = null;
private InventoryView inventoryView = null;
private boolean initialized = false;
@ -190,9 +194,16 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
}
@SuppressWarnings("unchecked")
public T setServerInventoryView(ServerInventoryView serverInventoryView)
public T setInventoryView(InventoryView inventoryView)
{
checkFieldNotSetAndSet("serverInventoryView", serverInventoryView);
checkFieldNotSetAndSet("inventoryView", inventoryView);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setServerView(ServerView serverView)
{
checkFieldNotSetAndSet("serverView", serverView);
return (T) this;
}
@ -291,10 +302,16 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
return requestLogger;
}
public ServerInventoryView getServerInventoryView()
public ServerView getServerView()
{
initializeServerInventoryThingie();
return serverInventoryView;
initializeServerView();
return serverView;
}
public InventoryView getInventoryView()
{
initializeInventoryView();
return inventoryView;
}
private void initializeDruidServerMetadata()
@ -313,20 +330,34 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
}
}
private void initializeServerInventoryThingie()
private void initializeServerView()
{
if (serverView == null) {
initializeServerInventoryView();
serverView = serverInventoryView;
}
}
private void initializeInventoryView()
{
if (inventoryView == null) {
initializeServerInventoryView();
inventoryView = serverInventoryView;
}
}
private void initializeServerInventoryView()
{
if (serverInventoryView == null) {
final ExecutorService exec = Executors.newFixedThreadPool(
1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerInventoryView-%s").build()
);
setServerInventoryView(
new ServerInventoryView(
getConfigFactory().build(ServerInventoryThingieConfig.class),
getZkPaths(),
getCuratorFramework(),
exec,
getJsonMapper()
)
serverInventoryView = new ServerInventoryView(
getConfigFactory().build(ServerInventoryViewConfig.class),
getZkPaths(),
getCuratorFramework(),
exec,
getJsonMapper()
);
lifecycle.addManagedInstance(serverInventoryView);
}
@ -337,7 +368,7 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
if (requestLogger == null) {
try {
final String loggingType = props.getProperty("druid.request.logging.type");
if(loggingType.equals("emitter")) {
if("emitter".equals(loggingType)) {
setRequestLogger(Initialization.makeEmittingRequestLogger(
getProps(),
getEmitter()

View File

@ -0,0 +1,9 @@
package com.metamx.druid.client;
/**
*/
public interface InventoryView
{
public DruidServer getInventoryValue(String string);
public Iterable<DruidServer> getInventory();
}

View File

@ -43,7 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
*/
public class ServerInventoryView implements ServerView
public class ServerInventoryView implements ServerView, InventoryView
{
private static final EmittingLogger log = new EmittingLogger(ServerInventoryView.class);
@ -56,7 +56,7 @@ public class ServerInventoryView implements ServerView
private static final Map<String, Integer> removedSegments = new MapMaker().makeMap();
public ServerInventoryView(
final ServerInventoryThingieConfig config,
final ServerInventoryViewConfig config,
final ZkPathsConfig zkPaths,
final CuratorFramework curator,
final ExecutorService exec,
@ -236,11 +236,13 @@ public class ServerInventoryView implements ServerView
return started.get();
}
@Override
public DruidServer getInventoryValue(String containerKey)
{
return inventoryManager.getInventoryValue(containerKey);
}
@Override
public Iterable<DruidServer> getInventory()
{
return inventoryManager.getInventory();

View File

@ -24,7 +24,7 @@ import org.skife.config.Default;
/**
*/
public abstract class ServerInventoryThingieConfig
public abstract class ServerInventoryViewConfig
{
@Config("druid.master.removedSegmentLifetime")
@Default("1")

View File

@ -193,7 +193,7 @@ public class BrokerNode extends QueryableNode<BrokerNode>
1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("BrokerServerView-%s").build()
);
final BrokerServerView view = new BrokerServerView(
warehouse, getSmileMapper(), brokerHttpClient, getServerInventoryView(), viewExec
warehouse, getSmileMapper(), brokerHttpClient, getServerView(), viewExec
);
final CachingClusteredClient baseClient = new CachingClusteredClient(warehouse, view, cache, getSmileMapper());
@ -202,7 +202,7 @@ public class BrokerNode extends QueryableNode<BrokerNode>
final ClientQuerySegmentWalker texasRanger = new ClientQuerySegmentWalker(warehouse, getEmitter(), baseClient);
List<Module> theModules = Lists.newArrayList();
theModules.add(new ClientServletModule(texasRanger, getServerInventoryView(), getJsonMapper()));
theModules.add(new ClientServletModule(texasRanger, getInventoryView(), getJsonMapper()));
theModules.addAll(extraModules);
final Injector injector = Guice.createInjector(theModules);

View File

@ -27,7 +27,7 @@ import com.google.inject.Inject;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.InventoryView;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -47,11 +47,11 @@ public class ClientInfoResource
{
private static final int SEGMENT_HISTORY_MILLIS = 7 * 24 * 60 * 60 * 1000; // ONE WEEK
private ServerInventoryView serverInventoryView;
private InventoryView serverInventoryView;
@Inject
public ClientInfoResource(
ServerInventoryView serverInventoryView
InventoryView serverInventoryView
)
{
this.serverInventoryView = serverInventoryView;

View File

@ -22,7 +22,7 @@ package com.metamx.druid.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.google.inject.Provides;
import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.InventoryView;
import com.metamx.druid.query.segment.QuerySegmentWalker;
import com.sun.jersey.guice.JerseyServletModule;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
@ -34,12 +34,12 @@ import javax.inject.Singleton;
public class ClientServletModule extends JerseyServletModule
{
private final QuerySegmentWalker texasRanger;
private final ServerInventoryView serverInventoryView;
private final InventoryView serverInventoryView;
private final ObjectMapper jsonMapper;
public ClientServletModule(
QuerySegmentWalker texasRanger,
ServerInventoryView serverInventoryView,
InventoryView serverInventoryView,
ObjectMapper jsonMapper
)
{
@ -53,7 +53,7 @@ public class ClientServletModule extends JerseyServletModule
{
bind(ClientInfoResource.class);
bind(QuerySegmentWalker.class).toInstance(texasRanger);
bind(ServerInventoryView.class).toInstance(serverInventoryView);
bind(InventoryView.class).toInstance(serverInventoryView);
serve("/*").with(GuiceContainer.class);
}

View File

@ -20,7 +20,7 @@ PID=$!
trap "kill $PID ; exit 1" 1 2 3 15
sleep 4
grep druid.examples.RandomFirehoseFactory RealtimeNode.out | awk '{ print $7,$8,$9,$10,$11,$12,$13,$14,$15 }'
grep druid.examples.rand.RandomFirehoseFactory RealtimeNode.out | awk '{ print $7,$8,$9,$10,$11,$12,$13,$14,$15 }'
wait $PID
echo "RealtimeStandaloneMain finished"

View File

@ -1,6 +1,5 @@
package druid.examples;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
@ -69,7 +68,6 @@ public class RealtimeStandaloneMain
}
);
rn.registerJacksonSubtype(new NamedType(RandomFirehoseFactory.class, "rand"));
try {
lifecycle.start();

View File

@ -0,0 +1,32 @@
[{
"schema": {
"dataSource": "randseq",
"aggregators": [
{"type": "count", "name": "events"},
{"type": "doubleSum", "name": "outColumn", "fieldName": "inColumn"}
],
"indexGranularity": "minute",
"shardSpec": {"type": "none"}
},
"config": {
"maxRowsInMemory": 50000,
"intermediatePersistPeriod": "PT10m"
},
"firehose": {
"type": "rand",
"sleepUsec": 100000,
"maxGeneratedRows": 5000000,
"seed": 0,
"nTokens": 19,
"nPerSleep": 3
},
"plumber": {
"type": "realtime",
"windowPeriod": "PT5m",
"segmentGranularity": "hour",
"basePersistDirectory": "/tmp/rand_realtime/basePersist"
}
}]

View File

@ -1,21 +1,27 @@
package druid.examples;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.InventoryView;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.realtime.RealtimeNode;
import com.metamx.druid.realtime.SegmentPublisher;
import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.Executor;
/** Standalone Demo Realtime process.
* Created: 20121009T2050
/**
* Standalone Demo Realtime process.
*/
public class RealtimeStandaloneMain
{
@ -30,46 +36,18 @@ public class RealtimeStandaloneMain
RealtimeNode rn = RealtimeNode.builder().build();
lifecycle.addManagedInstance(rn);
// register the Firehose
rn.registerJacksonSubtype(new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"));
final DataSegmentAnnouncer dummySegmentAnnouncer =
new DataSegmentAnnouncer()
{
@Override
public void announceSegment(DataSegment segment) throws IOException
{
// do nothing
}
@Override
public void unannounceSegment(DataSegment segment) throws IOException
{
// do nothing
}
};
SegmentPublisher dummySegmentPublisher =
new SegmentPublisher()
{
@Override
public void publishSegment(DataSegment segment) throws IOException
{
// do nothing
}
};
// dummySegmentPublisher will not send updates to db because standalone demo has no db
rn.setSegmentPublisher(dummySegmentPublisher);
rn.setAnnouncer(dummySegmentAnnouncer);
rn.setDataSegmentPusher(
new DataSegmentPusher()
{
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{
return segment;
}
}
rn.registerJacksonSubtype(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights")
);
rn.registerJacksonSubtype(new NamedType(RandomFirehoseFactory.class, "rand"));
// Create dummy objects for the various interfaces that interact with the DB, ZK and deep storage
rn.setSegmentPublisher(new NoopSegmentPublisher());
rn.setAnnouncer(new NoopDataSegmentAnnouncer());
rn.setDataSegmentPusher(new NoopDataSegmentPusher());
rn.setServerView(new NoopServerView());
rn.setInventoryView(new NoopInventoryView());
Runtime.getRuntime().addShutdownHook(
new Thread(
@ -96,4 +74,71 @@ public class RealtimeStandaloneMain
lifecycle.join();
}
private static class NoopServerView implements ServerView
{
@Override
public void registerServerCallback(
Executor exec, ServerCallback callback
)
{
}
@Override
public void registerSegmentCallback(
Executor exec, SegmentCallback callback
)
{
}
}
private static class NoopInventoryView implements InventoryView
{
@Override
public DruidServer getInventoryValue(String string)
{
return null;
}
@Override
public Iterable<DruidServer> getInventory()
{
return ImmutableList.of();
}
}
private static class NoopDataSegmentPusher implements DataSegmentPusher
{
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{
return segment;
}
}
private static class NoopSegmentPublisher implements SegmentPublisher
{
@Override
public void publishSegment(DataSegment segment) throws IOException
{
// do nothing
}
}
private static class NoopDataSegmentAnnouncer implements DataSegmentAnnouncer
{
@Override
public void announceSegment(DataSegment segment) throws IOException
{
// do nothing
}
@Override
public void unannounceSegment(DataSegment segment) throws IOException
{
// do nothing
}
}
}

View File

@ -1,4 +1,4 @@
package druid.examples;
package druid.examples.rand;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

View File

@ -9,7 +9,6 @@ import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.realtime.Firehose;
import com.metamx.druid.realtime.FirehoseFactory;
import twitter4j.ConnectionLifeCycleListener;
import twitter4j.HashtagEntity;
import twitter4j.Status;
@ -242,8 +241,10 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory {
// allow this event through, and the next hasMore() call will be false
}
}
rowCount++;
if (rowCount % 100 == 0) log.info("nextRow() has returned " + rowCount + " InputRows");
if (++rowCount % 1000 == 0) {
log.info("nextRow() has returned %,d InputRows", rowCount);
}
Status status;
try {
status = queue.take();

View File

@ -97,10 +97,10 @@ public class HadoopDruidIndexerNode
final Path s3nPath = new Path(String.format("s3n://%s", argumentSpec.substring("s3://".length())));
final FileSystem fs = s3nPath.getFileSystem(new Configuration());
String configString = CharStreams.toString(new InputSupplier<Readable>()
String configString = CharStreams.toString(new InputSupplier<InputStreamReader>()
{
@Override
public Readable getInput() throws IOException
public InputStreamReader getInput() throws IOException
{
return new InputStreamReader(fs.open(s3nPath));
}

View File

@ -357,7 +357,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
segmentPusher,
dataSegmentKiller,
getAnnouncer(),
getServerInventoryView(),
getServerView(),
getConglomerate(),
getJsonMapper()
);

View File

@ -187,7 +187,7 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
injectables.put("segmentPusher", getDataSegmentPusher());
injectables.put("segmentAnnouncer", getAnnouncer());
injectables.put("segmentPublisher", getSegmentPublisher());
injectables.put("serverView", getServerInventoryView());
injectables.put("serverView", getServerView());
injectables.put("serviceEmitter", getEmitter());
getJsonMapper().setInjectableValues(

View File

@ -29,7 +29,7 @@ import com.google.common.collect.Sets;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.InventoryView;
import com.metamx.druid.client.indexing.IndexingServiceClient;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseSegmentManager;
@ -61,7 +61,7 @@ import java.util.TreeSet;
public class InfoResource
{
private final DruidMaster master;
private final ServerInventoryView serverInventoryView;
private final InventoryView serverInventoryView;
private final DatabaseSegmentManager databaseSegmentManager;
private final DatabaseRuleManager databaseRuleManager;
private final IndexingServiceClient indexingServiceClient;
@ -69,7 +69,7 @@ public class InfoResource
@Inject
public InfoResource(
DruidMaster master,
ServerInventoryView serverInventoryView,
InventoryView serverInventoryView,
DatabaseSegmentManager databaseSegmentManager,
DatabaseRuleManager databaseRuleManager,
IndexingServiceClient indexingServiceClient

View File

@ -32,8 +32,8 @@ import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.ServerInventoryThingieConfig;
import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.client.indexing.IndexingServiceClient;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.config.ConfigManager;
@ -129,7 +129,7 @@ public class MasterMain
1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerInventoryView-%s").build()
);
ServerInventoryView serverInventoryView = new ServerInventoryView(
configFactory.build(ServerInventoryThingieConfig.class), zkPaths, curatorFramework, exec, jsonMapper
configFactory.build(ServerInventoryViewConfig.class), zkPaths, curatorFramework, exec, jsonMapper
);
lifecycle.addManagedInstance(serverInventoryView);

View File

@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.google.inject.Provides;
import com.google.inject.util.Providers;
import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.InventoryView;
import com.metamx.druid.client.indexing.IndexingServiceClient;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseSegmentManager;
@ -37,7 +37,7 @@ import javax.inject.Singleton;
*/
public class MasterServletModule extends JerseyServletModule
{
private final ServerInventoryView serverInventoryView;
private final InventoryView serverInventoryView;
private final DatabaseSegmentManager segmentInventoryManager;
private final DatabaseRuleManager databaseRuleManager;
private final DruidMaster master;
@ -45,7 +45,7 @@ public class MasterServletModule extends JerseyServletModule
private final IndexingServiceClient indexingServiceClient;
public MasterServletModule(
ServerInventoryView serverInventoryView,
InventoryView serverInventoryView,
DatabaseSegmentManager segmentInventoryManager,
DatabaseRuleManager databaseRuleManager,
DruidMaster master,
@ -66,7 +66,7 @@ public class MasterServletModule extends JerseyServletModule
{
bind(InfoResource.class);
bind(MasterResource.class);
bind(ServerInventoryView.class).toInstance(serverInventoryView);
bind(InventoryView.class).toInstance(serverInventoryView);
bind(DatabaseSegmentManager.class).toInstance(segmentInventoryManager);
bind(DatabaseRuleManager.class).toInstance(databaseRuleManager);
bind(DruidMaster.class).toInstance(master);