diff --git a/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java b/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java index b8609453f3d..988ee329d00 100644 --- a/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java +++ b/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java @@ -24,8 +24,9 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; -import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; +import io.druid.indexing.firehose.IngestSegmentFirehoseFactory; import io.druid.initialization.DruidModule; +import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; import java.util.List; @@ -37,7 +38,8 @@ public class IndexingServiceFirehoseModule implements DruidModule return ImmutableList.of( new SimpleModule("IndexingServiceFirehoseModule") .registerSubtypes( - new NamedType(EventReceiverFirehoseFactory.class, "receiver") + new NamedType(EventReceiverFirehoseFactory.class, "receiver"), + new NamedType(IngestSegmentFirehoseFactory.class, "ingestSegment") ) ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index f9395165f27..f83453fafe9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.firehose.IngestTask; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -53,7 +54,8 @@ import io.druid.query.QueryRunner; @JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class), @JsonSubTypes.Type(name = "noop", value = NoopTask.class), @JsonSubTypes.Type(name = "version_converter", value = VersionConverterTask.class), - @JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterTask.SubTask.class) + @JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterTask.SubTask.class), + @JsonSubTypes.Type(name = "ingest-task", value = IngestTask.class) }) public interface Task { diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java new file mode 100644 index 00000000000..1358d3d48c3 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -0,0 +1,372 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.firehose; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.client.repackaged.com.google.common.base.Throwables; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import com.google.inject.Injector; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.guava.Yielder; +import com.metamx.common.guava.YieldingAccumulator; +import com.metamx.common.parsers.ParseException; +import com.metamx.emitter.EmittingLogger; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.impl.InputRowParser; +import io.druid.granularity.QueryGranularity; +import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.TaskToolboxFactory; +import io.druid.indexing.common.actions.SegmentListUsedAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.task.AbstractTask; +import io.druid.query.filter.DimFilter; +import io.druid.query.select.EventHolder; +import io.druid.segment.Cursor; +import io.druid.segment.DimensionSelector; +import io.druid.segment.IndexIO; +import io.druid.segment.ObjectColumnSelector; +import io.druid.segment.QueryableIndexStorageAdapter; +import io.druid.segment.StorageAdapter; +import io.druid.segment.TimestampColumnSelector; +import io.druid.segment.data.IndexedInts; +import io.druid.segment.filter.Filters; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.VersionedIntervalTimeline; +import io.druid.utils.Runnables; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class IngestSegmentFirehoseFactory implements FirehoseFactory +{ + private static final EmittingLogger log = new EmittingLogger(IngestSegmentFirehoseFactory.class); + private final String dataSource; + private final Interval interval; + private final DimFilter dimFilter; + private final List dimensions; + private final List metrics; + private final Injector injector; + + @JsonCreator + public IngestSegmentFirehoseFactory( + @JsonProperty("dataSource") final String dataSource, + @JsonProperty("interval") Interval interval, + @JsonProperty("filter") DimFilter dimFilter, + @JsonProperty("dimensions") List dimensions, + @JsonProperty("metrics") List metrics, + @JacksonInject Injector injector + ) + { + Preconditions.checkNotNull(dataSource, "dataSource"); + Preconditions.checkNotNull(interval, "interval"); + this.dataSource = dataSource; + this.interval = interval; + this.dimFilter = dimFilter; + this.dimensions = dimensions; + this.metrics = metrics; + this.injector = injector; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public Interval getInterval() + { + return interval; + } + + @JsonProperty("filter") + public DimFilter getDimensionsFilter() + { + return dimFilter; + } + + @JsonProperty + public List getDimensions() + { + return dimensions; + } + + @JsonProperty + public List getMetrics() + { + return metrics; + } + + @Override + public Firehose connect(InputRowParser inputRowParser) throws IOException, ParseException + { + log.info("Connecting firehose: IngestSegmentFirehose[%s,%s]", dataSource, interval); + // better way to achieve this is to pass toolbox to Firehose, The instance is initialized Lazily on connect method. + final TaskToolbox toolbox = injector.getInstance(TaskToolboxFactory.class).build( + new IngestTask("Ingest-Task-Id", dataSource) + ); + + try { + final List usedSegments = toolbox + .getTaskActionClient() + .submit(new SegmentListUsedAction(dataSource, interval)); + final Map segmentFileMap = toolbox.fetchSegments(usedSegments); + VersionedIntervalTimeline timeline = new VersionedIntervalTimeline( + Ordering.natural().nullsFirst() + ); + + for (DataSegment segment : usedSegments) { + timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); + } + final List> timeLineSegments = timeline.lookup( + interval + ); + + List dims; + if (dimensions != null) { + dims = dimensions; + } else { + Set dimSet = new HashSet<>(); + for (TimelineObjectHolder timelineObjectHolder : timeLineSegments) { + dimSet.addAll(timelineObjectHolder.getObject().getChunk(0).getObject().getDimensions()); + } + dims = Lists.newArrayList(dimSet); + } + + List metricsList; + if (metrics != null) { + metricsList = metrics; + } else { + Set metricsSet = new HashSet<>(); + for (TimelineObjectHolder timelineObjectHolder : timeLineSegments) { + metricsSet.addAll(timelineObjectHolder.getObject().getChunk(0).getObject().getDimensions()); + } + metricsList = Lists.newArrayList(metricsSet); + } + + + final List adapters = Lists.transform( + timeLineSegments, + new Function, StorageAdapter>() + { + @Override + public StorageAdapter apply(TimelineObjectHolder input) + { + final DataSegment segment = input.getObject().getChunk(0).getObject(); + final File file = Preconditions.checkNotNull( + segmentFileMap.get(segment), + "File for segment %s", segment.getIdentifier() + ); + + try { + return new QueryableIndexStorageAdapter((IndexIO.loadIndex(file))); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + } + ); + + return new IngestSegmentFirehose(adapters, dims, metricsList); + + } + catch (IOException e) { + throw Throwables.propagate(e); + } + catch (SegmentLoadingException e) { + throw Throwables.propagate(e); + } + + } + + @Override + public InputRowParser getParser() + { + return null; + } + + public class IngestSegmentFirehose implements Firehose + { + private volatile Yielder rowYielder; + + public IngestSegmentFirehose(List adapters, final List dims, final List metrics) + { + Sequence rows = Sequences.concat( + Iterables.transform( + adapters, new Function>() + { + @Nullable + @Override + public Sequence apply(@Nullable StorageAdapter adapter) + { + return Sequences.concat( + Sequences.map( + adapter.makeCursors( + Filters.convertDimensionFilters(dimFilter), + interval, + QueryGranularity.ALL + ), new Function>() + { + @Nullable + @Override + public Sequence apply(@Nullable final Cursor cursor) + { + final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector(); + + final Map dimSelectors = Maps.newHashMap(); + for (String dim : dims) { + final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim); + dimSelectors.put(dim, dimSelector); + } + + final Map metSelectors = Maps.newHashMap(); + for (String metric : metrics) { + final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric); + metSelectors.put(metric, metricSelector); + } + + return Sequences.simple( + new Iterable() + { + @Override + public Iterator iterator() + { + return new Iterator() + { + @Override + public boolean hasNext() + { + return !cursor.isDone(); + } + + @Override + public InputRow next() + { + final Map theEvent = Maps.newLinkedHashMap(); + final long timestamp = timestampColumnSelector.getTimestamp(); + theEvent.put(EventHolder.timestampKey, new DateTime(timestamp)); + + for (Map.Entry dimSelector : dimSelectors.entrySet()) { + final String dim = dimSelector.getKey(); + final DimensionSelector selector = dimSelector.getValue(); + final IndexedInts vals = selector.getRow(); + + if (vals.size() == 1) { + final String dimVal = selector.lookupName(vals.get(0)); + theEvent.put(dim, dimVal); + } else { + List dimVals = Lists.newArrayList(); + for (int i = 0; i < vals.size(); ++i) { + dimVals.add(selector.lookupName(vals.get(i))); + } + theEvent.put(dim, dimVals); + } + } + + for (Map.Entry metSelector : metSelectors.entrySet()) { + final String metric = metSelector.getKey(); + final ObjectColumnSelector selector = metSelector.getValue(); + theEvent.put(metric, selector.get()); + } + cursor.advance(); + return new MapBasedInputRow(timestamp, dims, theEvent); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException("Remove Not Supported"); + } + }; + } + } + ); + } + } + ) + ); + } + } + ) + ); + rowYielder = rows.toYielder( + null, + new YieldingAccumulator() + { + @Override + public Object accumulate(Object accumulated, Object in) + { + yield(); + return in; + } + } + ); + } + + @Override + public boolean hasMore() + { + return !rowYielder.isDone(); + } + + @Override + public InputRow nextRow() + { + final InputRow inputRow = rowYielder.get(); + rowYielder = rowYielder.next(null); + return inputRow; + } + + @Override + public Runnable commit() + { + return Runnables.getNoopRunnable(); + } + + @Override + public void close() throws IOException + { + rowYielder.close(); + } + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestTask.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestTask.java new file mode 100644 index 00000000000..a2b37c6af92 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestTask.java @@ -0,0 +1,55 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.firehose; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.task.AbstractTask; + +public class IngestTask extends AbstractTask +{ + public IngestTask( + @JsonProperty("id") final String id, + @JsonProperty("dataSource") final String dataSource + ) + { + super(id, dataSource); + } + + @Override + public String getType() + { + return "Ingest-Task"; + } + + @Override + public boolean isReady(TaskActionClient taskActionClient) throws Exception + { + return true; + } + + @Override + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + return TaskStatus.success(getId()); + } +} \ No newline at end of file diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java index 137024c61f4..7eb83b70f02 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java @@ -963,7 +963,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter if (multiValueRow.size() == 0) { return null; } else if (multiValueRow.size() == 1) { - return columnVals.lookupName(multiValueRow.get(1)); + return columnVals.lookupName(multiValueRow.get(0)); } else { final String[] strings = new String[multiValueRow.size()]; for (int i = 0 ; i < multiValueRow.size() ; i++) { diff --git a/server/src/main/java/io/druid/guice/FirehoseModule.java b/server/src/main/java/io/druid/guice/FirehoseModule.java index ac10c297dd4..472fa2ff8fa 100644 --- a/server/src/main/java/io/druid/guice/FirehoseModule.java +++ b/server/src/main/java/io/druid/guice/FirehoseModule.java @@ -26,6 +26,7 @@ import com.google.inject.Binder; import io.druid.data.input.ProtoBufInputRowParser; import io.druid.initialization.DruidModule; import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; +import io.druid.segment.realtime.firehose.CombiningFirehoseFactory; import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; import io.druid.segment.realtime.firehose.IrcFirehoseFactory; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; @@ -53,7 +54,8 @@ public class FirehoseModule implements DruidModule new NamedType(TimedShutoffFirehoseFactory.class, "timed"), new NamedType(IrcFirehoseFactory.class, "irc"), new NamedType(LocalFirehoseFactory.class, "local"), - new NamedType(EventReceiverFirehoseFactory.class, "receiver") + new NamedType(EventReceiverFirehoseFactory.class, "receiver"), + new NamedType(CombiningFirehoseFactory.class, "combining") ) ); } diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/CombiningFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/CombiningFirehoseFactory.java new file mode 100644 index 00000000000..f4a5c8bba6d --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/firehose/CombiningFirehoseFactory.java @@ -0,0 +1,125 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.segment.realtime.firehose; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; +import com.google.api.client.repackaged.com.google.common.base.Throwables; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.InputRowParser; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +/** + * Creates firehose that combines data from different Firehoses. Useful for ingesting data from multiple sources. + */ +public class CombiningFirehoseFactory implements FirehoseFactory +{ + private final List delegateFactoryList; + + @JsonCreator + public CombiningFirehoseFactory( + @JsonProperty("delegates") List delegateFactoryList + ) + { + Preconditions.checkArgument(!delegateFactoryList.isEmpty()); + this.delegateFactoryList = delegateFactoryList; + } + + @Override + public Firehose connect(InputRowParser parser) throws IOException + { + return new CombiningFirehose(parser); + } + + @Override + public InputRowParser getParser() + { + return delegateFactoryList.get(0).getParser(); + } + + @JsonProperty("delegates") + public List getDelegateFactoryList() + { + return delegateFactoryList; + } + + public class CombiningFirehose implements Firehose + { + private final InputRowParser parser; + private final Iterator firehoseFactoryIterator; + private volatile Firehose currentFirehose; + + public CombiningFirehose(InputRowParser parser) throws IOException + { + this.firehoseFactoryIterator = delegateFactoryList.iterator(); + this.parser = parser; + nextFirehose(); + } + + private void nextFirehose() + { + if (firehoseFactoryIterator.hasNext()) { + try { + if (currentFirehose != null) { + currentFirehose.close(); + } + currentFirehose = firehoseFactoryIterator.next().connect(parser); + } + catch (IOException e) { + Throwables.propagate(e); + } + } + } + + @Override + public boolean hasMore() + { + return currentFirehose.hasMore(); + } + + @Override + public InputRow nextRow() + { + InputRow rv = currentFirehose.nextRow(); + if (!currentFirehose.hasMore()) { + nextFirehose(); + } + return rv; + } + + @Override + public Runnable commit() + { + return currentFirehose.commit(); + } + + @Override + public void close() throws IOException + { + currentFirehose.close(); + } + } +} diff --git a/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java b/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java new file mode 100644 index 00000000000..a69327b86be --- /dev/null +++ b/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java @@ -0,0 +1,147 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.realtime.firehose; + +import com.google.common.collect.Lists; +import com.metamx.common.parsers.ParseException; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.InputRowParser; +import io.druid.segment.realtime.firehose.CombiningFirehoseFactory; +import io.druid.utils.Runnables; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +public class CombiningFirehoseFactoryTest +{ + + @Test + public void testCombiningfirehose() throws IOException + { + List list1 = Arrays.asList(makeRow(1, 1), makeRow(2, 2)); + List list2 = Arrays.asList(makeRow(3, 3), makeRow(4, 4)); + FirehoseFactory combiningFactory = new CombiningFirehoseFactory( + Arrays.asList( + new ListFirehoseFactory( + list1 + ), new ListFirehoseFactory(list2) + ) + ); + final Firehose firehose = combiningFactory.connect(null); + for (int i = 1; i < 5; i++) { + Assert.assertTrue(firehose.hasMore()); + final InputRow inputRow = firehose.nextRow(); + Assert.assertEquals(i, inputRow.getTimestampFromEpoch()); + Assert.assertEquals(i, inputRow.getFloatMetric("test"), 0); + } + Assert.assertFalse(firehose.hasMore()); + } + + private InputRow makeRow(final long timestamp, final float metricValue) + { + return new InputRow() + { + @Override + public List getDimensions() + { + return Arrays.asList("testDim"); + } + + @Override + public long getTimestampFromEpoch() + { + return timestamp; + } + + @Override + public List getDimension(String dimension) + { + return Lists.newArrayList(); + } + + @Override + public float getFloatMetric(String metric) + { + return metricValue; + } + + @Override + public Object getRaw(String dimension) + { + return null; + } + + }; + } + + public static class ListFirehoseFactory implements FirehoseFactory + { + private final List rows; + + ListFirehoseFactory(List rows) + { + this.rows = rows; + } + + @Override + public Firehose connect(InputRowParser inputRowParser) throws IOException, ParseException + { + final Iterator iterator = rows.iterator(); + return new Firehose() + { + @Override + public boolean hasMore() + { + return iterator.hasNext(); + } + + @Override + public InputRow nextRow() + { + return iterator.next(); + } + + @Override + public Runnable commit() + { + return Runnables.getNoopRunnable(); + } + + @Override + public void close() throws IOException + { + // + } + }; + } + + @Override + public InputRowParser getParser() + { + return null; + } + } +}