mirror of https://github.com/apache/druid.git
UTs update for indexing-hadoop
This commit is contained in:
parent
e6ee98e2d2
commit
30f64ff19e
|
@ -88,13 +88,22 @@
|
||||||
<artifactId>jsr305</artifactId>
|
<artifactId>jsr305</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
<!-- Tests -->
|
<!-- Tests -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>junit</groupId>
|
<groupId>junit</groupId>
|
||||||
<artifactId>junit</artifactId>
|
<artifactId>junit</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.easymock</groupId>
|
||||||
|
<artifactId>easymock</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.hamcrest</groupId>
|
||||||
|
<artifactId>hamcrest-all</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|
|
@ -40,10 +40,6 @@ public class SortableBytes
|
||||||
{
|
{
|
||||||
this.groupKey = groupKey;
|
this.groupKey = groupKey;
|
||||||
this.sortKey = sortKey;
|
this.sortKey = sortKey;
|
||||||
|
|
||||||
if ("".equals(sortKey)) {
|
|
||||||
throw new IllegalArgumentException();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public byte[] getGroupKey()
|
public byte[] getGroupKey()
|
||||||
|
|
|
@ -19,8 +19,6 @@ package io.druid.indexer;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.common.collect.Iterators;
|
|
||||||
import com.metamx.common.ISE;
|
import com.metamx.common.ISE;
|
||||||
import io.druid.jackson.DefaultObjectMapper;
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -34,8 +32,6 @@ import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -44,30 +40,19 @@ public class Utils
|
||||||
{
|
{
|
||||||
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||||
|
|
||||||
public static <K, V> Map<K, V> zipMap(Iterable<K> keys, Iterable<V> values)
|
|
||||||
{
|
|
||||||
Map<K, V> retVal = new HashMap<K, V>();
|
|
||||||
|
|
||||||
Iterator<K> keyIter = keys.iterator();
|
|
||||||
Iterator<V> valsIter = values.iterator();
|
|
||||||
while (keyIter.hasNext()) {
|
|
||||||
final K key = keyIter.next();
|
|
||||||
|
|
||||||
Preconditions.checkArgument(valsIter.hasNext(), "keys longer than vals, bad, bad vals. Broke on key[%s]", key);
|
|
||||||
retVal.put(key, valsIter.next());
|
|
||||||
}
|
|
||||||
if (valsIter.hasNext()) {
|
|
||||||
throw new ISE("More values[%d] than keys[%d]", retVal.size() + Iterators.size(valsIter), retVal.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
return retVal;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static OutputStream makePathAndOutputStream(JobContext job, Path outputPath, boolean deleteExisting)
|
public static OutputStream makePathAndOutputStream(JobContext job, Path outputPath, boolean deleteExisting)
|
||||||
throws IOException
|
throws IOException
|
||||||
{
|
{
|
||||||
OutputStream retVal;
|
OutputStream retVal;
|
||||||
FileSystem fs = outputPath.getFileSystem(job.getConfiguration());
|
FileSystem fs = outputPath.getFileSystem(job.getConfiguration());
|
||||||
|
Class<? extends CompressionCodec> codecClass;
|
||||||
|
CompressionCodec codec = null;
|
||||||
|
|
||||||
|
if (FileOutputFormat.getCompressOutput(job)) {
|
||||||
|
codecClass = FileOutputFormat.getOutputCompressorClass(job, GzipCodec.class);
|
||||||
|
codec = ReflectionUtils.newInstance(codecClass, job.getConfiguration());
|
||||||
|
outputPath = new Path(outputPath.toString() + codec.getDefaultExtension());
|
||||||
|
}
|
||||||
|
|
||||||
if (fs.exists(outputPath)) {
|
if (fs.exists(outputPath)) {
|
||||||
if (deleteExisting) {
|
if (deleteExisting) {
|
||||||
|
@ -77,16 +62,11 @@ public class Utils
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!FileOutputFormat.getCompressOutput(job)) {
|
if (FileOutputFormat.getCompressOutput(job)) {
|
||||||
retVal = fs.create(outputPath, false);
|
|
||||||
} else {
|
|
||||||
Class<? extends CompressionCodec> codecClass = FileOutputFormat.getOutputCompressorClass(job, GzipCodec.class);
|
|
||||||
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job.getConfiguration());
|
|
||||||
outputPath = new Path(outputPath.toString() + codec.getDefaultExtension());
|
|
||||||
|
|
||||||
retVal = codec.createOutputStream(fs.create(outputPath, false));
|
retVal = codec.createOutputStream(fs.create(outputPath, false));
|
||||||
|
} else {
|
||||||
|
retVal = fs.create(outputPath, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,97 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.indexer;
|
||||||
|
|
||||||
|
import com.google.common.primitives.Bytes;
|
||||||
|
|
||||||
|
import org.hamcrest.number.OrderingComparison;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.metamx.common.Pair;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
public class BucketTest
|
||||||
|
{
|
||||||
|
Bucket bucket;
|
||||||
|
int shardNum;
|
||||||
|
int partitionNum;
|
||||||
|
DateTime time;
|
||||||
|
|
||||||
|
@Before public void setUp()
|
||||||
|
{
|
||||||
|
time = new DateTime(2014, 11, 24, 10, 30);
|
||||||
|
shardNum = 1;
|
||||||
|
partitionNum = 1;
|
||||||
|
bucket = new Bucket(shardNum, time, partitionNum);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After public void tearDown()
|
||||||
|
{
|
||||||
|
bucket = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testToGroupKey()
|
||||||
|
{
|
||||||
|
byte[] firstPart = {1, 1, 0, 10};
|
||||||
|
byte[] secondPart = {2, 4, 0, 5};
|
||||||
|
byte[] expectedGroupParts = bucket.toGroupKey(firstPart,secondPart);
|
||||||
|
Pair<Bucket, byte[]> actualPair = Bucket.fromGroupKey(expectedGroupParts);
|
||||||
|
Assert.assertEquals("Bucket is not matching", bucket, actualPair.lhs);
|
||||||
|
Assert.assertArrayEquals("Parts not matching", Bytes.concat(firstPart,secondPart), actualPair.rhs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testToString()
|
||||||
|
{
|
||||||
|
String expectedString = "Bucket{" +
|
||||||
|
"time=" + time +
|
||||||
|
", partitionNum=" + partitionNum +
|
||||||
|
", shardNum=" + shardNum +
|
||||||
|
'}';
|
||||||
|
Assert.assertEquals(bucket.toString(),expectedString);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testEquals()
|
||||||
|
{
|
||||||
|
Assert.assertFalse("Object should not be equals to NULL", bucket.equals(null));
|
||||||
|
Assert.assertFalse("Objects do not have the same Class",bucket.equals(new Integer(0)));
|
||||||
|
Assert.assertFalse("Objects do not have the same partitionNum",
|
||||||
|
bucket.equals(new Bucket(shardNum, time, partitionNum + 1)));
|
||||||
|
Assert.assertFalse("Objects do not have the same shardNum",
|
||||||
|
bucket.equals(new Bucket(shardNum + 1,time,partitionNum)));
|
||||||
|
Assert.assertFalse("Objects do not have the same time",bucket.equals(new Bucket(shardNum,new DateTime(),partitionNum)));
|
||||||
|
Assert.assertFalse("Object do have NULL time",bucket.equals(new Bucket(shardNum,null,partitionNum)));
|
||||||
|
Assert.assertTrue("Objects must be the same",bucket.equals(new Bucket(shardNum, time, partitionNum)));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testHashCode()
|
||||||
|
{
|
||||||
|
int hashCode = bucket.hashCode();
|
||||||
|
Assert.assertThat(hashCode, OrderingComparison.greaterThanOrEqualTo(31 * partitionNum + shardNum));
|
||||||
|
bucket = new Bucket(shardNum,null,partitionNum);
|
||||||
|
hashCode = bucket.hashCode();
|
||||||
|
Assert.assertEquals(hashCode, (31 * partitionNum + shardNum));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,77 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.indexer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.mapred.JobContext;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
import org.easymock.EasyMock;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
public class HadoopIOPeonTest
|
||||||
|
{
|
||||||
|
final String TMP_FILE_NAME = "test_file";
|
||||||
|
JobContext mockJobContext;
|
||||||
|
Configuration jobConfig;
|
||||||
|
boolean overwritesFiles = true;
|
||||||
|
HadoopIOPeon ioPeon;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TemporaryFolder tmpFolder = new TemporaryFolder();
|
||||||
|
|
||||||
|
@Before public void setUp() throws IOException
|
||||||
|
{
|
||||||
|
jobConfig = new Configuration();
|
||||||
|
mockJobContext = EasyMock.createMock(JobContext.class);
|
||||||
|
EasyMock.expect(mockJobContext.getConfiguration()).andReturn(jobConfig).anyTimes();
|
||||||
|
EasyMock.replay(mockJobContext);
|
||||||
|
|
||||||
|
ioPeon = new HadoopIOPeon(mockJobContext,new Path(tmpFolder.newFile().getParent()),overwritesFiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After public void tearDown()
|
||||||
|
{
|
||||||
|
jobConfig = null;
|
||||||
|
mockJobContext = null;
|
||||||
|
tmpFolder.delete();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testMakeOutputStream() throws IOException
|
||||||
|
{
|
||||||
|
Assert.assertNotNull(ioPeon.makeOutputStream(TMP_FILE_NAME));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testMakeInputStream() throws IOException
|
||||||
|
{
|
||||||
|
Assert.assertNotNull(ioPeon.makeInputStream(tmpFolder.newFile(TMP_FILE_NAME).getName()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = UnsupportedOperationException.class) public void testCleanup() throws IOException
|
||||||
|
{
|
||||||
|
ioPeon.cleanup();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,131 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.indexer;
|
||||||
|
|
||||||
|
import com.google.common.io.ByteStreams;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
|
import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
|
import org.apache.hadoop.mapred.JobContext;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
import org.easymock.EasyMock;
|
||||||
|
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
public class UtilsCompressionTest
|
||||||
|
{
|
||||||
|
|
||||||
|
private static final String DUMMY_STRING = "Very important string";
|
||||||
|
private static final String TMP_FILE_NAME = "test_file";
|
||||||
|
private static final Class<? extends CompressionCodec> DEFAULT_COMPRESSION_CODEC = GzipCodec.class;
|
||||||
|
private static final String CODEC_CLASS = "org.apache.hadoop.io.compress.GzipCodec";
|
||||||
|
private Configuration jobConfig;
|
||||||
|
private JobContext mockJobContext;
|
||||||
|
private FileSystem defaultFileSystem;
|
||||||
|
private CompressionCodec codec;
|
||||||
|
private File tmpFile;
|
||||||
|
private Path tmpPathWithoutExtension;
|
||||||
|
private Path tmpPathWithExtension;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TemporaryFolder tmpFolder = new TemporaryFolder();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException
|
||||||
|
{
|
||||||
|
jobConfig = new Configuration();
|
||||||
|
mockJobContext = EasyMock.createMock(JobContext.class);
|
||||||
|
EasyMock.expect(mockJobContext.getConfiguration()).andReturn(jobConfig).anyTimes();
|
||||||
|
EasyMock.replay(mockJobContext);
|
||||||
|
|
||||||
|
jobConfig.setBoolean(FileOutputFormat.COMPRESS, true);
|
||||||
|
jobConfig.set(FileOutputFormat.COMPRESS_CODEC, CODEC_CLASS);
|
||||||
|
Class<? extends CompressionCodec> codecClass = FileOutputFormat
|
||||||
|
.getOutputCompressorClass(mockJobContext, DEFAULT_COMPRESSION_CODEC);
|
||||||
|
codec = ReflectionUtils.newInstance(codecClass, jobConfig);
|
||||||
|
|
||||||
|
tmpFile = tmpFolder.newFile(TMP_FILE_NAME + codec.getDefaultExtension());
|
||||||
|
tmpPathWithExtension = new Path(tmpFile.getAbsolutePath());
|
||||||
|
tmpPathWithoutExtension = new Path(tmpFile.getParent(), TMP_FILE_NAME);
|
||||||
|
defaultFileSystem = tmpPathWithoutExtension.getFileSystem(jobConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown()
|
||||||
|
{
|
||||||
|
tmpFolder.delete();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testExistsCompressedFile() throws IOException
|
||||||
|
{
|
||||||
|
boolean expected = Utils.exists(mockJobContext,defaultFileSystem,tmpPathWithoutExtension);
|
||||||
|
Assert.assertTrue("Should be true since file is created", expected);
|
||||||
|
tmpFolder.delete();
|
||||||
|
expected = Utils.exists(mockJobContext,defaultFileSystem,tmpPathWithoutExtension);
|
||||||
|
Assert.assertFalse("Should be false since file is deleted",expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCompressedOpenInputStream() throws IOException
|
||||||
|
{
|
||||||
|
boolean overwrite = true;
|
||||||
|
OutputStream outStream = codec.createOutputStream(defaultFileSystem.create(tmpPathWithExtension, overwrite));
|
||||||
|
writeStingToOutputStream(DUMMY_STRING,outStream);
|
||||||
|
InputStream inStream = Utils.openInputStream(mockJobContext, tmpPathWithoutExtension);
|
||||||
|
Assert.assertNotNull("Input stream should not be Null",inStream);
|
||||||
|
String actual = new String(ByteStreams.toByteArray(inStream), StandardCharsets.UTF_8.toString());
|
||||||
|
Assert.assertEquals("Strings not matching", DUMMY_STRING,actual);
|
||||||
|
inStream.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCompressedMakePathAndOutputStream() throws IOException
|
||||||
|
{
|
||||||
|
boolean overwrite = true;
|
||||||
|
OutputStream outStream = Utils.makePathAndOutputStream(mockJobContext,tmpPathWithoutExtension, overwrite);
|
||||||
|
Assert.assertNotNull("Output stream should not be null",outStream);
|
||||||
|
writeStingToOutputStream(DUMMY_STRING,outStream);
|
||||||
|
InputStream inStream = codec.createInputStream(defaultFileSystem.open(tmpPathWithExtension));
|
||||||
|
String actual = new String(ByteStreams.toByteArray(inStream), StandardCharsets.UTF_8.toString());
|
||||||
|
Assert.assertEquals("Strings not matching", DUMMY_STRING,actual);
|
||||||
|
inStream.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeStingToOutputStream(String string, OutputStream outStream) throws IOException
|
||||||
|
{
|
||||||
|
outStream.write(string.getBytes(StandardCharsets.UTF_8.toString()));
|
||||||
|
outStream.flush();
|
||||||
|
outStream.close();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,137 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.indexer;
|
||||||
|
|
||||||
|
import com.google.common.base.Function;
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
|
import com.google.common.io.ByteStreams;
|
||||||
|
import com.metamx.common.ISE;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.mapred.JobContext;
|
||||||
|
|
||||||
|
import org.easymock.EasyMock;
|
||||||
|
import org.hamcrest.core.Is;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
|
||||||
|
public class UtilsTest
|
||||||
|
{
|
||||||
|
private static final String DUMMY_STRING = "Very important string";
|
||||||
|
private static final String TMP_FILE_NAME = "test_file";
|
||||||
|
private Configuration jobConfig;
|
||||||
|
private JobContext mockJobContext;
|
||||||
|
private Map expectedMap;
|
||||||
|
private File tmpFile;
|
||||||
|
private Path tmpPath;
|
||||||
|
private FileSystem defaultFileSystem;
|
||||||
|
private Set setOfKeys;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TemporaryFolder tmpFolder = new TemporaryFolder();
|
||||||
|
|
||||||
|
private class CreateValueFromKey implements Function
|
||||||
|
{
|
||||||
|
@Override public Object apply(Object input)
|
||||||
|
{
|
||||||
|
return input.toString() + DUMMY_STRING;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException
|
||||||
|
{
|
||||||
|
jobConfig = new Configuration();
|
||||||
|
mockJobContext = EasyMock.createMock(JobContext.class);
|
||||||
|
EasyMock.expect(mockJobContext.getConfiguration()).andReturn(jobConfig).anyTimes();
|
||||||
|
EasyMock.replay(mockJobContext);
|
||||||
|
|
||||||
|
setOfKeys = new HashSet();
|
||||||
|
setOfKeys.addAll(new ArrayList<>(Arrays.asList("key1","key2","key3")));
|
||||||
|
expectedMap = (Map<String, Object>) Maps.asMap(setOfKeys, new CreateValueFromKey());
|
||||||
|
|
||||||
|
tmpFile = tmpFolder.newFile(TMP_FILE_NAME);
|
||||||
|
tmpPath = new Path(tmpFile.getAbsolutePath());
|
||||||
|
defaultFileSystem = tmpPath.getFileSystem(jobConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown()
|
||||||
|
{
|
||||||
|
tmpFolder.delete();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExistsPlainFile() throws IOException
|
||||||
|
{
|
||||||
|
boolean expected = Utils.exists(mockJobContext,defaultFileSystem,tmpPath);
|
||||||
|
Assert.assertTrue("Should be true since file is created",expected);
|
||||||
|
tmpFolder.delete();
|
||||||
|
expected = Utils.exists(mockJobContext,defaultFileSystem,tmpPath);
|
||||||
|
Assert.assertFalse("Should be false since file is deleted",expected);
|
||||||
|
EasyMock.verify(mockJobContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPlainStoreThenGetStats() throws IOException
|
||||||
|
{
|
||||||
|
Utils.storeStats(mockJobContext, tmpPath,expectedMap);
|
||||||
|
Map actualMap = Utils.getStats(mockJobContext, tmpPath);
|
||||||
|
Assert.assertThat(actualMap,Is.is(actualMap));
|
||||||
|
EasyMock.verify(mockJobContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = ISE.class)
|
||||||
|
public void testExceptionInMakePathAndOutputStream() throws IOException
|
||||||
|
{
|
||||||
|
boolean overwrite = false;
|
||||||
|
Utils.makePathAndOutputStream(mockJobContext,tmpPath,overwrite);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPlainOpenInputStream() throws IOException
|
||||||
|
{
|
||||||
|
FileUtils.writeStringToFile(tmpFile, DUMMY_STRING);
|
||||||
|
InputStream inStream = Utils.openInputStream(mockJobContext, tmpPath);
|
||||||
|
Assert.assertNotNull(inStream);
|
||||||
|
String expected = new String(ByteStreams.toByteArray(inStream), StandardCharsets.UTF_8.toString());
|
||||||
|
Assert.assertEquals(expected, DUMMY_STRING);
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,8 +17,10 @@
|
||||||
|
|
||||||
package io.druid.indexer.partitions;
|
package io.druid.indexer.partitions;
|
||||||
|
|
||||||
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import io.druid.indexer.HadoopDruidIndexerConfigTest;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -26,6 +28,8 @@ import org.junit.Test;
|
||||||
*/
|
*/
|
||||||
public class HashedPartitionsSpecTest
|
public class HashedPartitionsSpecTest
|
||||||
{
|
{
|
||||||
|
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHashedPartitionsSpec() throws Exception
|
public void testHashedPartitionsSpec() throws Exception
|
||||||
{
|
{
|
||||||
|
@ -33,7 +37,7 @@ public class HashedPartitionsSpecTest
|
||||||
final PartitionsSpec partitionsSpec;
|
final PartitionsSpec partitionsSpec;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
partitionsSpec = HadoopDruidIndexerConfigTest.jsonReadWriteRead(
|
partitionsSpec = jsonReadWriteRead(
|
||||||
"{"
|
"{"
|
||||||
+ " \"targetPartitionSize\":100,"
|
+ " \"targetPartitionSize\":100,"
|
||||||
+ " \"type\":\"hashed\""
|
+ " \"type\":\"hashed\""
|
||||||
|
@ -73,7 +77,7 @@ public class HashedPartitionsSpecTest
|
||||||
final PartitionsSpec partitionsSpec;
|
final PartitionsSpec partitionsSpec;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
partitionsSpec = HadoopDruidIndexerConfigTest.jsonReadWriteRead(
|
partitionsSpec = jsonReadWriteRead(
|
||||||
"{"
|
"{"
|
||||||
+ " \"type\":\"hashed\","
|
+ " \"type\":\"hashed\","
|
||||||
+ " \"numShards\":2"
|
+ " \"numShards\":2"
|
||||||
|
@ -111,4 +115,14 @@ public class HashedPartitionsSpecTest
|
||||||
|
|
||||||
Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec);
|
Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private <T> T jsonReadWriteRead(String s, Class<T> klass)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
return jsonMapper.readValue(jsonMapper.writeValueAsBytes(jsonMapper.readValue(s, klass)), klass);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.indexer.path;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class GranularUnprocessedPathSpecTest
|
||||||
|
{
|
||||||
|
@Test
|
||||||
|
public void testSetGetMaxBuckets()
|
||||||
|
{
|
||||||
|
GranularUnprocessedPathSpec granularUnprocessedPathSpec = new GranularUnprocessedPathSpec();
|
||||||
|
int maxBuckets = 5;
|
||||||
|
granularUnprocessedPathSpec.setMaxBuckets(maxBuckets);
|
||||||
|
Assert.assertEquals(maxBuckets,granularUnprocessedPathSpec.getMaxBuckets());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,70 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.indexer.path;
|
||||||
|
|
||||||
|
import com.metamx.common.Granularity;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
|
public class GranularityPathSpecTest
|
||||||
|
{
|
||||||
|
private GranularityPathSpec granularityPathSpec;
|
||||||
|
private final String TEST_STRING_PATH = "TEST";
|
||||||
|
private final String TEST_STRING_PATTERN = "*.TEST";
|
||||||
|
private final String TEST_STRING_FORMAT = "F_TEST";
|
||||||
|
|
||||||
|
@Before public void setUp()
|
||||||
|
{
|
||||||
|
granularityPathSpec = new GranularityPathSpec();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After public void tearDown()
|
||||||
|
{
|
||||||
|
granularityPathSpec = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testSetInputPath()
|
||||||
|
{
|
||||||
|
granularityPathSpec.setInputPath(TEST_STRING_PATH);
|
||||||
|
Assert.assertEquals(TEST_STRING_PATH,granularityPathSpec.getInputPath());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testSetFilePattern()
|
||||||
|
{
|
||||||
|
granularityPathSpec.setFilePattern(TEST_STRING_PATTERN);
|
||||||
|
Assert.assertEquals(TEST_STRING_PATTERN,granularityPathSpec.getFilePattern());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testSetPathFormat()
|
||||||
|
{
|
||||||
|
granularityPathSpec.setPathFormat(TEST_STRING_FORMAT);
|
||||||
|
Assert.assertEquals(TEST_STRING_FORMAT,granularityPathSpec.getPathFormat());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testSetDataGranularity()
|
||||||
|
{
|
||||||
|
Granularity granularity = Granularity.DAY;
|
||||||
|
granularityPathSpec.setDataGranularity(granularity);
|
||||||
|
Assert.assertEquals(granularity,granularityPathSpec.getDataGranularity());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue