Merge pull request #2505 from gianm/rt-exceptions

Harmonize realtime indexing loop across the task and standalone nodes.
This commit is contained in:
Fangjin Yang 2016-02-19 11:23:14 -08:00
commit a3c29b91cc
3 changed files with 82 additions and 61 deletions

View File

@ -59,6 +59,7 @@ import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import io.druid.segment.realtime.plumber.Committers;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.segment.realtime.plumber.Plumbers;
import io.druid.segment.realtime.plumber.RealtimePlumberSchool;
import io.druid.segment.realtime.plumber.VersioningPolicy;
import io.druid.server.coordination.DataSegmentAnnouncer;
@ -336,31 +337,7 @@ public class RealtimeIndexTask extends AbstractTask
// Time to read data!
while (firehose != null && (!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) {
final InputRow inputRow;
try {
inputRow = firehose.nextRow();
if (inputRow == null) {
log.debug("thrown away null input row, considering unparseable");
fireDepartment.getMetrics().incrementUnparseable();
continue;
}
}
catch (ParseException e) {
log.debug(e, "thrown away line due to exception, considering unparseable");
fireDepartment.getMetrics().incrementUnparseable();
continue;
}
int numRows = plumber.add(inputRow, committerSupplier);
if (numRows == -1) {
fireDepartment.getMetrics().incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);
continue;
}
fireDepartment.getMetrics().incrementProcessed();
Plumbers.addNextRow(committerSupplier, firehose, plumber, fireDepartment.getMetrics());
}
}
catch (Throwable e) {

View File

@ -50,6 +50,7 @@ import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.plumber.Committers;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.Plumbers;
import org.joda.time.Interval;
import java.io.Closeable;
@ -339,42 +340,7 @@ public class RealtimeManager implements QuerySegmentWalker
{
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose);
while (firehose.hasMore()) {
final InputRow inputRow;
try {
inputRow = firehose.nextRow();
if (inputRow == null) {
log.debug("thrown away null input row, considering unparseable");
metrics.incrementUnparseable();
continue;
}
}
catch (ParseException e) {
log.debug(e, "thrown away line due to exception, considering unparseable");
metrics.incrementUnparseable();
continue;
}
boolean lateEvent = false;
boolean indexLimitExceeded = false;
try {
lateEvent = plumber.add(inputRow, committerSupplier) == -1;
}
catch (IndexSizeExceededException e) {
log.info("Index limit exceeded: %s", e.getMessage());
indexLimitExceeded = true;
}
if (indexLimitExceeded || lateEvent) {
metrics.incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);
if (indexLimitExceeded) {
plumber.persist(committerSupplier.get());
}
continue;
}
metrics.incrementProcessed();
Plumbers.addNextRow(committerSupplier, firehose, plumber, metrics);
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.plumber;
import com.google.common.base.Supplier;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.realtime.FireDepartmentMetrics;
public class Plumbers
{
private static final Logger log = new Logger(Plumbers.class);
private Plumbers()
{
// No instantiation
}
public static void addNextRow(
final Supplier<Committer> committerSupplier,
final Firehose firehose,
final Plumber plumber,
final FireDepartmentMetrics metrics
)
{
try {
final InputRow inputRow = firehose.nextRow();
if (inputRow == null) {
log.debug("Discarded null input row, considering unparseable.");
metrics.incrementUnparseable();
return;
}
// Included in ParseException try/catch, as additional parsing can be done during indexing.
int numRows = plumber.add(inputRow, committerSupplier);
if (numRows == -1) {
metrics.incrementThrownAway();
log.debug("Discarded row[%s], considering thrownAway.", inputRow);
return;
}
metrics.incrementProcessed();
}
catch (ParseException e) {
log.debug(e, "Discarded row due to exception, considering unparseable.");
metrics.incrementUnparseable();
}
catch (IndexSizeExceededException e) {
// Shouldn't happen if this is only being called by a single thread.
// plumber.add should be swapping out indexes before they fill up.
throw new ISE(e, "WTF?! Index size exceeded, this shouldn't happen. Bad Plumber!");
}
}
}