fix the example assembly and have better error logging for rabbitmq

This commit is contained in:
fjy 2013-10-03 10:54:25 -07:00
parent 8885412b33
commit bf157ea8ef
4 changed files with 24 additions and 52 deletions

View File

@ -1,46 +0,0 @@
[{
"schema": {
"dataSource": "flights",
"aggregators": [
{"type": "count", "name": "flights"},
{"type": "longSum", "name": "Distance", "fieldName": "Distance"},
{"type": "longSum", "name": "TaxiIn", "fieldName": "TaxiIn"},
{"type": "longSum", "name": "TaxiOut", "fieldName": "TaxiOut"},
{"type": "longSum", "name": "CarrierDelay", "fieldName": "CarrierDelay"},
{"type": "longSum", "name": "WeatherDelay", "fieldName": "WeatherDelay"},
{"type": "longSum", "name": "NASDelay", "fieldName": "NASDelay"},
{"type": "longSum", "name": "SecurityDelay", "fieldName": "SecurityDelay"},
{"type": "longSum", "name": "LateAircraftDelay", "fieldName": "LateAircraftDelay"},
{"type": "longSum", "name": "ArrDelay", "fieldName": "ArrDelay"},
{"type": "longSum", "name": "DepDelay", "fieldName": "DepDelay"},
{"type": "longSum", "name": "CRSElapsedTime", "fieldName": "CRSElapsedTime"},
{"type": "longSum", "name": "ActualElapsedTime", "fieldName": "ActualElapsedTime"},
{"type": "longSum", "name": "AirTime", "fieldName": "AirTime"}
],
"indexGranularity": "minute",
"shardSpec": {"type": "none"}
},
"config": {
"maxRowsInMemory": 650000,
"intermediatePersistPeriod": "PT30m"
},
"firehose": {
"type": "flights",
"directory": "/Users/cheddar/work/third-party/sensei-ba/sensei-ba-1.5.1-SNAPSHOT/flights-data/converted/druid_in",
"parser" : { "timestampSpec" : { "column" : "timestamp", "format" : "iso" },
"data" : { "format" : "json" },
"dimensionExclusions" : ["Distance", "TaxiIn", "TaxiOut", "CarrierDelay", "WeatherDelay",
"NASDelay", "SecurityDelay", "LateAircraftDelay", "ArrDelay",
"DepDelay", "CRSElapsedTime", "ActualElapsedTime", "AirTime"] }
},
"plumber": {
"type": "realtime",
"windowPeriod": "P365d",
"segmentGranularity": "year",
"basePersistDirectory": "/tmp/druid/flights/basePersist",
"rejectionPolicy": {"type": "messageTime"}
}
}]

View File

@ -21,6 +21,7 @@ package io.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.metamx.common.logger.Logger;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
@ -120,7 +121,14 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory
boolean exclusive = config.isExclusive();
boolean autoDelete = config.isAutoDelete();
final Connection connection = connectionFactory.newConnection();
final Connection connection;
try {
connection = connectionFactory.newConnection();
} catch (Exception e) {
log.error("Unable to find a RabbitMQ broker. Are you sure you have one running?");
throw Throwables.propagate(e);
}
connection.addShutdownListener(new ShutdownListener()
{
@Override

View File

@ -500,7 +500,17 @@ public class RealtimePlumberSchool implements PlumberSchool
private void bootstrapSinksFromDisk()
{
for (File sinkDir : computeBaseDir(schema).listFiles()) {
File baseDir = computeBaseDir(schema);
if (baseDir == null || !baseDir.exists()) {
return;
}
File[] files = baseDir.listFiles();
if (files == null) {
return;
}
for (File sinkDir : files) {
Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/"));
//final File[] sinkFiles = sinkDir.listFiles();

View File

@ -22,11 +22,11 @@
<outputDirectory>config/broker</outputDirectory>
</fileSet>
<fileSet>
<directory>../examples/config/master</directory>
<directory>../examples/config/coordinator</directory>
<includes>
<include>*</include>
</includes>
<outputDirectory>config/master</outputDirectory>
<outputDirectory>config/coordinator</outputDirectory>
</fileSet>
<fileSet>
<directory>../examples/config/realtime</directory>
@ -36,11 +36,11 @@
<outputDirectory>config/realtime</outputDirectory>
</fileSet>
<fileSet>
<directory>../examples/config/compute</directory>
<directory>../examples/config/historical</directory>
<includes>
<include>*</include>
</includes>
<outputDirectory>config/compute</outputDirectory>
<outputDirectory>config/historical</outputDirectory>
</fileSet>
<fileSet>
<directory>../examples/bin</directory>