minor fixes

fix IndexOutOfBoundsException
fix ingestFirehose
This commit is contained in:
nishantmonu51 2014-07-17 17:24:57 +05:30
parent 972c5dac31
commit e59c9ebdbc
4 changed files with 64 additions and 32 deletions

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.firehose.IngestTask;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -53,7 +54,8 @@ import io.druid.query.QueryRunner;
@JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class),
@JsonSubTypes.Type(name = "noop", value = NoopTask.class),
@JsonSubTypes.Type(name = "version_converter", value = VersionConverterTask.class),
@JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterTask.SubTask.class)
@JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterTask.SubTask.class),
@JsonSubTypes.Type(name = "ingest-task", value = IngestTask.class)
})
public interface Task
{

View File

@ -153,12 +153,14 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<String, DataSegment>(
Ordering.<String>natural().nullsFirst()
);
final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = timeline.lookup(
interval
);
for (DataSegment segment : usedSegments) {
timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
}
final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = timeline.lookup(
interval
);
List<String> dims;
if (dimensions != null) {
dims = dimensions;
@ -223,33 +225,6 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
return null;
}
private static class IngestTask extends AbstractTask
{
protected IngestTask(String id, String dataSource)
{
super(id, dataSource);
}
@Override
public String getType()
{
return "Ingest-Task";
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return true;
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
return TaskStatus.success(getId());
}
}
public class IngestSegmentFirehose implements Firehose
{
private volatile Yielder<InputRow> rowYielder;

View File

@ -0,0 +1,55 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.firehose;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.AbstractTask;
public class IngestTask extends AbstractTask
{
public IngestTask(
@JsonProperty("id") final String id,
@JsonProperty("dataSource") final String dataSource
)
{
super(id, dataSource);
}
@Override
public String getType()
{
return "Ingest-Task";
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return true;
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
return TaskStatus.success(getId());
}
}

View File

@ -963,7 +963,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
if (multiValueRow.size() == 0) {
return null;
} else if (multiValueRow.size() == 1) {
return columnVals.lookupName(multiValueRow.get(1));
return columnVals.lookupName(multiValueRow.get(0));
} else {
final String[] strings = new String[multiValueRow.size()];
for (int i = 0 ; i < multiValueRow.size() ; i++) {