mirror of https://github.com/apache/druid.git
Merge branch 'master' of github.com:metamx/druid
This commit is contained in:
commit
08318af7f7
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.5.36-SNAPSHOT</version>
|
||||
<version>0.5.37-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.5.36-SNAPSHOT</version>
|
||||
<version>0.5.37-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.5.36-SNAPSHOT</version>
|
||||
<version>0.5.37-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.5.36-SNAPSHOT</version>
|
||||
<version>0.5.37-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.5.36-SNAPSHOT</version>
|
||||
<version>0.5.37-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.5.36-SNAPSHOT</version>
|
||||
<version>0.5.37-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -23,7 +23,7 @@
|
|||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<version>0.5.36-SNAPSHOT</version>
|
||||
<version>0.5.37-SNAPSHOT</version>
|
||||
<name>druid</name>
|
||||
<description>druid</description>
|
||||
<scm>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.5.36-SNAPSHOT</version>
|
||||
<version>0.5.37-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -163,38 +163,40 @@ public class RealtimeManager implements QuerySegmentWalker
|
|||
while (firehose.hasMore()) {
|
||||
final InputRow inputRow;
|
||||
try {
|
||||
inputRow = firehose.nextRow();
|
||||
try {
|
||||
inputRow = firehose.nextRow();
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.info(e, "thrown away line due to exception");
|
||||
metrics.incrementThrownAway();
|
||||
continue;
|
||||
}
|
||||
|
||||
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
|
||||
if (sink == null) {
|
||||
metrics.incrementThrownAway();
|
||||
log.debug("Throwing away event[%s]", inputRow);
|
||||
|
||||
if (System.currentTimeMillis() > nextFlush) {
|
||||
plumber.persist(firehose.commit());
|
||||
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
int currCount = sink.add(inputRow);
|
||||
metrics.incrementProcessed();
|
||||
if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
|
||||
plumber.persist(firehose.commit());
|
||||
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
|
||||
}
|
||||
}
|
||||
catch (FormattedException e) {
|
||||
log.info(e, "unparseable line: %s", e.getDetails());
|
||||
metrics.incrementUnparseable();
|
||||
continue;
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.info(e, "thrown away line due to exception");
|
||||
metrics.incrementThrownAway();
|
||||
continue;
|
||||
}
|
||||
|
||||
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
|
||||
if (sink == null) {
|
||||
metrics.incrementThrownAway();
|
||||
log.debug("Throwing away event[%s]", inputRow);
|
||||
|
||||
if (System.currentTimeMillis() > nextFlush) {
|
||||
plumber.persist(firehose.commit());
|
||||
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
int currCount = sink.add(inputRow);
|
||||
metrics.incrementProcessed();
|
||||
if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
|
||||
plumber.persist(firehose.commit());
|
||||
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
|
||||
}
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
log.makeAlert(e, "RuntimeException aborted realtime processing[%s]", fireDepartment.getSchema().getDataSource())
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.5.36-SNAPSHOT</version>
|
||||
<version>0.5.37-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -24,11 +24,11 @@
|
|||
<artifactId>druid-services</artifactId>
|
||||
<name>druid-services</name>
|
||||
<description>druid-services</description>
|
||||
<version>0.5.36-SNAPSHOT</version>
|
||||
<version>0.5.37-SNAPSHOT</version>
|
||||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.5.36-SNAPSHOT</version>
|
||||
<version>0.5.37-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
Loading…
Reference in New Issue