HadoopyStringInputRowParser to convert stringy Text, BytesWritable etc into InputRow

This commit is contained in:
Himanshu Gupta 2015-08-31 22:27:34 -05:00
parent 74f4572bd4
commit e8b9ee85a7
6 changed files with 217 additions and 23 deletions

View File

@ -103,7 +103,8 @@ public class HadoopDruidIndexerConfig
binder, Key.get(DruidNode.class, Self.class), new DruidNode("hadoop-indexer", null, null)
);
}
}
},
new IndexingHadoopModule()
)
);
jsonMapper = injector.getInstance(ObjectMapper.class);

View File

@ -104,11 +104,9 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
{
if (parser instanceof StringInputRowParser && value instanceof Text) {
//Note: This is to ensure backward compatibility with 0.7.0 and before
//HadoopyStringInputRowParser can handle this and this special case is not needed
//except for backward compatibility
return ((StringInputRowParser) parser).parse(value.toString());
} else if (parser instanceof StringInputRowParser && value instanceof BytesWritable) {
BytesWritable valueBytes = (BytesWritable) value;
ByteBuffer valueBuffer = ByteBuffer.wrap(valueBytes.getBytes(), 0, valueBytes.getLength());
return ((StringInputRowParser) parser).parse(valueBuffer);
} else if (value instanceof InputRow) {
return (InputRow) value;
} else {

View File

@ -0,0 +1,69 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.common.IAE;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.StringInputRowParser;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import java.nio.ByteBuffer;
/**
*/
public class HadoopyStringInputRowParser implements InputRowParser<Object>
{
private final StringInputRowParser parser;
public HadoopyStringInputRowParser(@JsonProperty("parseSpec") ParseSpec parseSpec)
{
this.parser = new StringInputRowParser(parseSpec);
}
@Override
public InputRow parse(Object input)
{
if (input instanceof Text) {
return parser.parse(((Text) input).toString());
} else if (input instanceof BytesWritable) {
BytesWritable valueBytes = (BytesWritable) input;
return parser.parse(ByteBuffer.wrap(valueBytes.getBytes(), 0, valueBytes.getLength()));
} else {
throw new IAE("can't convert type [%s] to InputRow", input.getClass().getName());
}
}
@JsonProperty
@Override
public ParseSpec getParseSpec()
{
return parser.getParseSpec();
}
@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
return new HadoopyStringInputRowParser(parseSpec);
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
import java.util.Arrays;
import java.util.List;
/**
*/
public class IndexingHadoopModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return Arrays.<Module>asList(
new SimpleModule("IndexingHadoopModule")
.registerSubtypes(
new NamedType(HadoopyStringInputRowParser.class, "hadoopyString")
)
);
}
@Override
public void configure(Binder binder)
{
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.data.input.impl.InputRowParser;
import org.junit.Assert;
import org.junit.Test;
/**
*/
public class HadoopyStringInputRowParserTest
{
@Test
public void testSerde() throws Exception
{
String jsonStr = "{"
+ "\"type\":\"hadoopyString\","
+ "\"parseSpec\":{\"format\":\"json\",\"timestampSpec\":{\"column\":\"xXx\"},\"dimensionsSpec\":{}}"
+ "}";
ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper;
InputRowParser parser = jsonMapper.readValue(
jsonMapper.writeValueAsString(
jsonMapper.readValue(jsonStr, InputRowParser.class)
),
InputRowParser.class
);
Assert.assertTrue(parser instanceof HadoopyStringInputRowParser);
Assert.assertEquals("xXx", parser.getParseSpec().getTimestampSpec().getTimestampColumn());
}
}

View File

@ -19,7 +19,6 @@
package io.druid.indexer;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Charsets;
@ -30,10 +29,10 @@ import com.google.common.collect.Maps;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
@ -126,7 +125,15 @@ public class IndexGeneratorJobTest
"2014102300,i.example.com,963",
"2014102300,j.example.com,333"
),
null
null,
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
)
},
{
false,
@ -160,7 +167,15 @@ public class IndexGeneratorJobTest
"2014102216,q.example.com,500",
"2014102216,q.example.com,87"
),
null
null,
new HadoopyStringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
)
},
{
true,
@ -194,7 +209,15 @@ public class IndexGeneratorJobTest
"2014102216,q.example.com,500",
"2014102216,q.example.com,87"
),
null
null,
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
)
},
{
false,
@ -238,7 +261,15 @@ public class IndexGeneratorJobTest
"2014102300,i.example.com,963",
"2014102300,j.example.com,333"
),
SequenceFileInputFormat.class.getName()
SequenceFileInputFormat.class.getName(),
new HadoopyStringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
)
}
}
);
@ -257,6 +288,7 @@ public class IndexGeneratorJobTest
private List<String> data;
private boolean useCombiner;
private String inputFormatName;
private InputRowParser inputRowParser;
public IndexGeneratorJobTest(
boolean useCombiner,
@ -264,7 +296,8 @@ public class IndexGeneratorJobTest
String interval,
Object[][][] shardInfoForEachSegment,
List<String> data,
String inputFormatName
String inputFormatName,
InputRowParser inputRowParser
) throws IOException
{
this.useCombiner = useCombiner;
@ -273,6 +306,7 @@ public class IndexGeneratorJobTest
this.interval = new Interval(interval);
this.data = data;
this.inputFormatName = inputFormatName;
this.inputRowParser = inputRowParser;
}
private void writeDataToLocalSequenceFile(File outputFile, List<String> data) throws IOException
@ -305,11 +339,9 @@ public class IndexGeneratorJobTest
@Before
public void setUp() throws Exception
{
mapper = new DefaultObjectMapper();
mapper = HadoopDruidIndexerConfig.jsonMapper;
mapper.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed"));
mapper.registerSubtypes(new NamedType(SingleDimensionShardSpec.class, "single"));
InjectableValues inject = new InjectableValues.Std().addValue(ObjectMapper.class, mapper);
mapper.setInjectableValues(inject);
dataFile = temporaryFolder.newFile();
tmpDir = temporaryFolder.newFolder();
@ -332,14 +364,7 @@ public class IndexGeneratorJobTest
new DataSchema(
"website",
mapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
),
inputRowParser,
Map.class
),
new AggregatorFactory[]{