diff --git a/examples/flights.spec b/examples/flights.spec deleted file mode 100644 index 0bb544d29d6..00000000000 --- a/examples/flights.spec +++ /dev/null @@ -1,46 +0,0 @@ -[{ - "schema": { - "dataSource": "flights", - "aggregators": [ - {"type": "count", "name": "flights"}, - {"type": "longSum", "name": "Distance", "fieldName": "Distance"}, - {"type": "longSum", "name": "TaxiIn", "fieldName": "TaxiIn"}, - {"type": "longSum", "name": "TaxiOut", "fieldName": "TaxiOut"}, - {"type": "longSum", "name": "CarrierDelay", "fieldName": "CarrierDelay"}, - {"type": "longSum", "name": "WeatherDelay", "fieldName": "WeatherDelay"}, - {"type": "longSum", "name": "NASDelay", "fieldName": "NASDelay"}, - {"type": "longSum", "name": "SecurityDelay", "fieldName": "SecurityDelay"}, - {"type": "longSum", "name": "LateAircraftDelay", "fieldName": "LateAircraftDelay"}, - {"type": "longSum", "name": "ArrDelay", "fieldName": "ArrDelay"}, - {"type": "longSum", "name": "DepDelay", "fieldName": "DepDelay"}, - {"type": "longSum", "name": "CRSElapsedTime", "fieldName": "CRSElapsedTime"}, - {"type": "longSum", "name": "ActualElapsedTime", "fieldName": "ActualElapsedTime"}, - {"type": "longSum", "name": "AirTime", "fieldName": "AirTime"} - ], - "indexGranularity": "minute", - "shardSpec": {"type": "none"} - }, - - "config": { - "maxRowsInMemory": 650000, - "intermediatePersistPeriod": "PT30m" - }, - - "firehose": { - "type": "flights", - "directory": "/Users/cheddar/work/third-party/sensei-ba/sensei-ba-1.5.1-SNAPSHOT/flights-data/converted/druid_in", - "parser" : { "timestampSpec" : { "column" : "timestamp", "format" : "iso" }, - "data" : { "format" : "json" }, - "dimensionExclusions" : ["Distance", "TaxiIn", "TaxiOut", "CarrierDelay", "WeatherDelay", - "NASDelay", "SecurityDelay", "LateAircraftDelay", "ArrDelay", - "DepDelay", "CRSElapsedTime", "ActualElapsedTime", "AirTime"] } - }, - - "plumber": { - "type": "realtime", - "windowPeriod": "P365d", - "segmentGranularity": "year", - "basePersistDirectory": "/tmp/druid/flights/basePersist", - "rejectionPolicy": {"type": "messageTime"} - } -}] diff --git a/realtime/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseFactory.java b/realtime/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseFactory.java index c57c658956b..2cfdddacd4a 100644 --- a/realtime/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseFactory.java +++ b/realtime/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseFactory.java @@ -21,6 +21,7 @@ package io.druid.segment.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.metamx.common.logger.Logger; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -120,7 +121,14 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory boolean exclusive = config.isExclusive(); boolean autoDelete = config.isAutoDelete(); - final Connection connection = connectionFactory.newConnection(); + final Connection connection; + try { + connection = connectionFactory.newConnection(); + } catch (Exception e) { + log.error("Unable to find a RabbitMQ broker. Are you sure you have one running?"); + throw Throwables.propagate(e); + } + connection.addShutdownListener(new ShutdownListener() { @Override diff --git a/realtime/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/realtime/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index d7ffb255063..74893edbf02 100644 --- a/realtime/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/realtime/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -500,7 +500,17 @@ public class RealtimePlumberSchool implements PlumberSchool private void bootstrapSinksFromDisk() { - for (File sinkDir : computeBaseDir(schema).listFiles()) { + File baseDir = computeBaseDir(schema); + if (baseDir == null || !baseDir.exists()) { + return; + } + + File[] files = baseDir.listFiles(); + if (files == null) { + return; + } + + for (File sinkDir : files) { Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/")); //final File[] sinkFiles = sinkDir.listFiles(); diff --git a/services/src/assembly/assembly.xml b/services/src/assembly/assembly.xml index 63c53ae50fa..7ba9ac842c0 100644 --- a/services/src/assembly/assembly.xml +++ b/services/src/assembly/assembly.xml @@ -22,11 +22,11 @@ config/broker - ../examples/config/master + ../examples/config/coordinator * - config/master + config/coordinator ../examples/config/realtime @@ -36,11 +36,11 @@ config/realtime - ../examples/config/compute + ../examples/config/historical * - config/compute + config/historical ../examples/bin