HBASE-8611. Improve test coverage in pkg org.apache.hadoop.hbase.mapred
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1537567 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
faff0fa2d3
commit
b51e21446c
|
@ -21,24 +21,31 @@ package org.apache.hadoop.hbase.mapred;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.util.ProgramDriver;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Driver for hbase mapreduce jobs. Select which to run by passing
|
||||
* name of job to this main.
|
||||
* Driver for hbase mapreduce jobs. Select which to run by passing name of job
|
||||
* to this main.
|
||||
*/
|
||||
@Deprecated
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Stable
|
||||
public class Driver {
|
||||
|
||||
private static ProgramDriver pgd = new ProgramDriver();
|
||||
|
||||
@VisibleForTesting
|
||||
static void setProgramDriver(ProgramDriver pgd0) {
|
||||
pgd = pgd0;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param args
|
||||
* @throws Throwable
|
||||
*/
|
||||
public static void main(String[] args) throws Throwable {
|
||||
ProgramDriver pgd = new ProgramDriver();
|
||||
pgd.addClass(RowCounter.NAME, RowCounter.class,
|
||||
"Count rows in HBase table");
|
||||
ProgramDriver.class.getMethod("driver", new Class [] {String[].class}).
|
||||
invoke(pgd, new Object[]{args});
|
||||
pgd.addClass(RowCounter.NAME, RowCounter.class, "Count rows in HBase table");
|
||||
ProgramDriver.class.getMethod("driver", new Class[] { String[].class })
|
||||
.invoke(pgd, new Object[] { args });
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.util.ProgramDriver;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestDriver {
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("deprecation")
|
||||
public void testDriverMainMethod() throws Throwable {
|
||||
ProgramDriver programDriverMock = mock(ProgramDriver.class);
|
||||
Driver.setProgramDriver(programDriverMock);
|
||||
Driver.main(new String[]{});
|
||||
verify(programDriverMock).driver(Mockito.any(String[].class));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,178 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.OutputCollector;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestGroupingTableMap {
|
||||
|
||||
@Test
|
||||
@SuppressWarnings({ "deprecation", "unchecked" })
|
||||
public void shouldNotCallCollectonSinceFindUniqueKeyValueMoreThanOnes()
|
||||
throws Exception {
|
||||
GroupingTableMap gTableMap = null;
|
||||
try {
|
||||
Result result = mock(Result.class);
|
||||
Reporter reporter = mock(Reporter.class);
|
||||
gTableMap = new GroupingTableMap();
|
||||
Configuration cfg = new Configuration();
|
||||
cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
|
||||
JobConf jobConf = new JobConf(cfg);
|
||||
gTableMap.configure(jobConf);
|
||||
|
||||
byte[] row = {};
|
||||
List<Cell> keyValues = ImmutableList.<Cell>of(
|
||||
new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("1111")),
|
||||
new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("2222")),
|
||||
new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), Bytes.toBytes("3333")));
|
||||
when(result.listCells()).thenReturn(keyValues);
|
||||
OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
|
||||
mock(OutputCollector.class);
|
||||
gTableMap.map(null, result, outputCollectorMock, reporter);
|
||||
verify(result).listCells();
|
||||
verifyZeroInteractions(outputCollectorMock);
|
||||
} finally {
|
||||
if (gTableMap != null)
|
||||
gTableMap.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings({ "deprecation", "unchecked" })
|
||||
public void shouldCreateNewKeyAlthoughExtraKey() throws Exception {
|
||||
GroupingTableMap gTableMap = null;
|
||||
try {
|
||||
Result result = mock(Result.class);
|
||||
Reporter reporter = mock(Reporter.class);
|
||||
gTableMap = new GroupingTableMap();
|
||||
Configuration cfg = new Configuration();
|
||||
cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
|
||||
JobConf jobConf = new JobConf(cfg);
|
||||
gTableMap.configure(jobConf);
|
||||
|
||||
byte[] row = {};
|
||||
List<Cell> keyValues = ImmutableList.<Cell>of(
|
||||
new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("1111")),
|
||||
new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), Bytes.toBytes("2222")),
|
||||
new KeyValue(row, "familyC".getBytes(), "qualifierC".getBytes(), Bytes.toBytes("3333")));
|
||||
when(result.listCells()).thenReturn(keyValues);
|
||||
OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
|
||||
mock(OutputCollector.class);
|
||||
gTableMap.map(null, result, outputCollectorMock, reporter);
|
||||
verify(result).listCells();
|
||||
verify(outputCollectorMock, times(1))
|
||||
.collect(any(ImmutableBytesWritable.class), any(Result.class));
|
||||
verifyNoMoreInteractions(outputCollectorMock);
|
||||
} finally {
|
||||
if (gTableMap != null)
|
||||
gTableMap.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings({ "deprecation" })
|
||||
public void shouldCreateNewKey() throws Exception {
|
||||
GroupingTableMap gTableMap = null;
|
||||
try {
|
||||
Result result = mock(Result.class);
|
||||
Reporter reporter = mock(Reporter.class);
|
||||
final byte[] bSeparator = Bytes.toBytes(" ");
|
||||
gTableMap = new GroupingTableMap();
|
||||
Configuration cfg = new Configuration();
|
||||
cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
|
||||
JobConf jobConf = new JobConf(cfg);
|
||||
gTableMap.configure(jobConf);
|
||||
|
||||
final byte[] firstPartKeyValue = Bytes.toBytes("34879512738945");
|
||||
final byte[] secondPartKeyValue = Bytes.toBytes("35245142671437");
|
||||
byte[] row = {};
|
||||
List<Cell> cells = ImmutableList.<Cell>of(
|
||||
new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), firstPartKeyValue),
|
||||
new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), secondPartKeyValue));
|
||||
when(result.listCells()).thenReturn(cells);
|
||||
|
||||
final AtomicBoolean outputCollected = new AtomicBoolean();
|
||||
OutputCollector<ImmutableBytesWritable, Result> outputCollector =
|
||||
new OutputCollector<ImmutableBytesWritable, Result>() {
|
||||
@Override
|
||||
public void collect(ImmutableBytesWritable arg, Result result) throws IOException {
|
||||
assertArrayEquals(com.google.common.primitives.Bytes.concat(firstPartKeyValue, bSeparator,
|
||||
secondPartKeyValue), arg.copyBytes());
|
||||
outputCollected.set(true);
|
||||
}
|
||||
};
|
||||
|
||||
gTableMap.map(null, result, outputCollector, reporter);
|
||||
verify(result).listCells();
|
||||
Assert.assertTrue("Output not received", outputCollected.get());
|
||||
|
||||
final byte[] firstPartValue = Bytes.toBytes("238947928");
|
||||
final byte[] secondPartValue = Bytes.toBytes("4678456942345");
|
||||
byte[][] data = { firstPartValue, secondPartValue };
|
||||
ImmutableBytesWritable byteWritable = gTableMap.createGroupKey(data);
|
||||
assertArrayEquals(com.google.common.primitives.Bytes.concat(firstPartValue,
|
||||
bSeparator, secondPartValue), byteWritable.get());
|
||||
} finally {
|
||||
if (gTableMap != null)
|
||||
gTableMap.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings({ "deprecation" })
|
||||
public void shouldReturnNullFromCreateGroupKey() throws Exception {
|
||||
GroupingTableMap gTableMap = null;
|
||||
try {
|
||||
gTableMap = new GroupingTableMap();
|
||||
assertNull(gTableMap.createGroupKey(null));
|
||||
} finally {
|
||||
if(gTableMap != null)
|
||||
gTableMap.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,63 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.mapred.OutputCollector;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestIdentityTableMap {
|
||||
|
||||
@Test
|
||||
@SuppressWarnings({ "deprecation", "unchecked" })
|
||||
public void shouldCollectPredefinedTimes() throws IOException {
|
||||
int recordNumber = 999;
|
||||
Result resultMock = mock(Result.class);
|
||||
IdentityTableMap identityTableMap = null;
|
||||
try {
|
||||
Reporter reporterMock = mock(Reporter.class);
|
||||
identityTableMap = new IdentityTableMap();
|
||||
ImmutableBytesWritable bytesWritableMock = mock(ImmutableBytesWritable.class);
|
||||
OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
|
||||
mock(OutputCollector.class);
|
||||
|
||||
for (int i = 0; i < recordNumber; i++)
|
||||
identityTableMap.map(bytesWritableMock, resultMock, outputCollectorMock,
|
||||
reporterMock);
|
||||
|
||||
verify(outputCollectorMock, times(recordNumber)).collect(
|
||||
Mockito.any(ImmutableBytesWritable.class), Mockito.any(Result.class));
|
||||
} finally {
|
||||
if (identityTableMap != null)
|
||||
identityTableMap.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,162 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.mapred.RowCounter.RowCounterMapper;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.OutputCollector;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestRowCounter {
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("deprecation")
|
||||
public void shouldPrintUsage() throws Exception {
|
||||
String expectedOutput = "rowcounter <outputdir> <tablename> <column1> [<column2>...]";
|
||||
String result = new OutputReader(System.out) {
|
||||
@Override
|
||||
void doRead() {
|
||||
assertEquals(-1, RowCounter.printUsage());
|
||||
}
|
||||
}.read();
|
||||
|
||||
assertTrue(result.startsWith(expectedOutput));
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("deprecation")
|
||||
public void shouldExitAndPrintUsageSinceParameterNumberLessThanThree()
|
||||
throws Exception {
|
||||
final String[] args = new String[] { "one", "two" };
|
||||
String line = "ERROR: Wrong number of parameters: " + args.length;
|
||||
String result = new OutputReader(System.err) {
|
||||
@Override
|
||||
void doRead() throws Exception {
|
||||
assertEquals(-1, new RowCounter().run(args));
|
||||
}
|
||||
}.read();
|
||||
|
||||
assertTrue(result.startsWith(line));
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings({ "deprecation", "unchecked" })
|
||||
public void shouldRegInReportEveryIncomingRow() throws IOException {
|
||||
int iterationNumber = 999;
|
||||
RowCounter.RowCounterMapper mapper = new RowCounter.RowCounterMapper();
|
||||
Reporter reporter = mock(Reporter.class);
|
||||
for (int i = 0; i < iterationNumber; i++)
|
||||
mapper.map(mock(ImmutableBytesWritable.class), mock(Result.class),
|
||||
mock(OutputCollector.class), reporter);
|
||||
|
||||
Mockito.verify(reporter, times(iterationNumber)).incrCounter(
|
||||
any(Enum.class), anyInt());
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings({ "deprecation" })
|
||||
public void shouldCreateAndRunSubmittableJob() throws Exception {
|
||||
RowCounter rCounter = new RowCounter();
|
||||
rCounter.setConf(HBaseConfiguration.create());
|
||||
String[] args = new String[] { "\temp", "tableA", "column1", "column2",
|
||||
"column3" };
|
||||
JobConf jobConfig = rCounter.createSubmittableJob(args);
|
||||
|
||||
assertNotNull(jobConfig);
|
||||
assertEquals(0, jobConfig.getNumReduceTasks());
|
||||
assertEquals("rowcounter", jobConfig.getJobName());
|
||||
assertEquals(jobConfig.getMapOutputValueClass(), Result.class);
|
||||
assertEquals(jobConfig.getMapperClass(), RowCounterMapper.class);
|
||||
assertEquals(jobConfig.get(TableInputFormat.COLUMN_LIST), Joiner.on(' ')
|
||||
.join("column1", "column2", "column3"));
|
||||
assertEquals(jobConfig.getMapOutputKeyClass(), ImmutableBytesWritable.class);
|
||||
}
|
||||
|
||||
enum Outs {
|
||||
OUT, ERR
|
||||
}
|
||||
|
||||
private static abstract class OutputReader {
|
||||
private final PrintStream ps;
|
||||
private PrintStream oldPrintStream;
|
||||
private Outs outs;
|
||||
|
||||
protected OutputReader(PrintStream ps) {
|
||||
this.ps = ps;
|
||||
}
|
||||
|
||||
protected String read() throws Exception {
|
||||
ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
|
||||
if (ps == System.out) {
|
||||
oldPrintStream = System.out;
|
||||
outs = Outs.OUT;
|
||||
System.setOut(new PrintStream(outBytes));
|
||||
} else if (ps == System.err) {
|
||||
oldPrintStream = System.err;
|
||||
outs = Outs.ERR;
|
||||
System.setErr(new PrintStream(outBytes));
|
||||
} else {
|
||||
throw new IllegalStateException("OutputReader: unsupported PrintStream");
|
||||
}
|
||||
|
||||
try {
|
||||
doRead();
|
||||
return new String(outBytes.toByteArray());
|
||||
} finally {
|
||||
switch (outs) {
|
||||
case OUT: {
|
||||
System.setOut(oldPrintStream);
|
||||
break;
|
||||
}
|
||||
case ERR: {
|
||||
System.setErr(oldPrintStream);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
throw new IllegalStateException(
|
||||
"OutputReader: unsupported PrintStream");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
abstract void doRead() throws Exception;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestSplitTable {
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("deprecation")
|
||||
public void testSplitTableCompareTo() {
|
||||
TableSplit aTableSplit = new TableSplit(Bytes.toBytes("tableA"),
|
||||
Bytes.toBytes("aaa"), Bytes.toBytes("ddd"), "locationA");
|
||||
|
||||
TableSplit bTableSplit = new TableSplit(Bytes.toBytes("tableA"),
|
||||
Bytes.toBytes("iii"), Bytes.toBytes("kkk"), "locationA");
|
||||
|
||||
TableSplit cTableSplit = new TableSplit(Bytes.toBytes("tableA"),
|
||||
Bytes.toBytes("lll"), Bytes.toBytes("zzz"), "locationA");
|
||||
|
||||
assertTrue(aTableSplit.compareTo(aTableSplit) == 0);
|
||||
assertTrue(bTableSplit.compareTo(bTableSplit) == 0);
|
||||
assertTrue(cTableSplit.compareTo(cTableSplit) == 0);
|
||||
|
||||
assertTrue(aTableSplit.compareTo(bTableSplit) < 0);
|
||||
assertTrue(bTableSplit.compareTo(aTableSplit) > 0);
|
||||
|
||||
assertTrue(aTableSplit.compareTo(cTableSplit) < 0);
|
||||
assertTrue(cTableSplit.compareTo(aTableSplit) > 0);
|
||||
|
||||
assertTrue(bTableSplit.compareTo(cTableSplit) < 0);
|
||||
assertTrue(cTableSplit.compareTo(bTableSplit) > 0);
|
||||
|
||||
assertTrue(cTableSplit.compareTo(aTableSplit) > 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("deprecation")
|
||||
public void testSplitTableEquals() {
|
||||
assertFalse(new TableSplit(Bytes.toBytes("tableA"), Bytes.toBytes("aaa"),
|
||||
Bytes.toBytes("ddd"), "locationA").equals(new TableSplit(Bytes
|
||||
.toBytes("tableB"), Bytes.toBytes("aaa"), Bytes.toBytes("ddd"),
|
||||
"locationA")));
|
||||
|
||||
assertFalse(new TableSplit(Bytes.toBytes("tableA"), Bytes.toBytes("aaa"),
|
||||
Bytes.toBytes("ddd"), "locationA").equals(new TableSplit(Bytes
|
||||
.toBytes("tableA"), Bytes.toBytes("bbb"), Bytes.toBytes("ddd"),
|
||||
"locationA")));
|
||||
|
||||
assertFalse(new TableSplit(Bytes.toBytes("tableA"), Bytes.toBytes("aaa"),
|
||||
Bytes.toBytes("ddd"), "locationA").equals(new TableSplit(Bytes
|
||||
.toBytes("tableA"), Bytes.toBytes("aaa"), Bytes.toBytes("eee"),
|
||||
"locationA")));
|
||||
|
||||
assertFalse(new TableSplit(Bytes.toBytes("tableA"), Bytes.toBytes("aaa"),
|
||||
Bytes.toBytes("ddd"), "locationA").equals(new TableSplit(Bytes
|
||||
.toBytes("tableA"), Bytes.toBytes("aaa"), Bytes.toBytes("ddd"),
|
||||
"locationB")));
|
||||
|
||||
assertTrue(new TableSplit(Bytes.toBytes("tableA"), Bytes.toBytes("aaa"),
|
||||
Bytes.toBytes("ddd"), "locationA").equals(new TableSplit(Bytes
|
||||
.toBytes("tableA"), Bytes.toBytes("aaa"), Bytes.toBytes("ddd"),
|
||||
"locationA")));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,274 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.LargeTests;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.mapred.JobClient;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.MapReduceBase;
|
||||
import org.apache.hadoop.mapred.OutputCollector;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.apache.hadoop.mapred.RunningJob;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
@Category(LargeTests.class)
|
||||
public class TestTableMapReduceUtil {
|
||||
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(TestTableMapReduceUtil.class);
|
||||
|
||||
private static HTable presidentsTable;
|
||||
private static final String TABLE_NAME = "People";
|
||||
|
||||
private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info");
|
||||
private static final byte[] COLUMN_QUALIFIER = Bytes.toBytes("name");
|
||||
|
||||
private static ImmutableSet<String> presidentsRowKeys = ImmutableSet.of(
|
||||
"president1", "president2", "president3");
|
||||
private static Iterator<String> presidentNames = ImmutableSet.of(
|
||||
"John F. Kennedy", "George W. Bush", "Barack Obama").iterator();
|
||||
|
||||
private static ImmutableSet<String> actorsRowKeys = ImmutableSet.of("actor1",
|
||||
"actor2");
|
||||
private static Iterator<String> actorNames = ImmutableSet.of(
|
||||
"Jack Nicholson", "Martin Freeman").iterator();
|
||||
|
||||
private static String PRESIDENT_PATTERN = "president";
|
||||
private static String ACTOR_PATTERN = "actor";
|
||||
private static ImmutableMap<String, ImmutableSet<String>> relation = ImmutableMap
|
||||
.of(PRESIDENT_PATTERN, presidentsRowKeys, ACTOR_PATTERN, actorsRowKeys);
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
UTIL.startMiniCluster();
|
||||
presidentsTable = createAndFillTable(Bytes.toBytes(TABLE_NAME));
|
||||
UTIL.startMiniMapReduceCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() throws Exception {
|
||||
UTIL.shutdownMiniMapReduceCluster();
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void before() throws IOException {
|
||||
LOG.info("before");
|
||||
UTIL.ensureSomeRegionServersAvailable(1);
|
||||
LOG.info("before done");
|
||||
}
|
||||
|
||||
public static HTable createAndFillTable(byte[] tableName) throws IOException {
|
||||
HTable table = UTIL.createTable(tableName, COLUMN_FAMILY);
|
||||
createPutCommand(table);
|
||||
return table;
|
||||
}
|
||||
|
||||
private static void createPutCommand(HTable table) throws IOException {
|
||||
for (String president : presidentsRowKeys) {
|
||||
if (presidentNames.hasNext()) {
|
||||
Put p = new Put(Bytes.toBytes(president));
|
||||
p.add(COLUMN_FAMILY, COLUMN_QUALIFIER,
|
||||
Bytes.toBytes(presidentNames.next()));
|
||||
table.put(p);
|
||||
}
|
||||
}
|
||||
|
||||
for (String actor : actorsRowKeys) {
|
||||
if (actorNames.hasNext()) {
|
||||
Put p = new Put(Bytes.toBytes(actor));
|
||||
p.add(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(actorNames.next()));
|
||||
table.put(p);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check what the given number of reduce tasks for the given job configuration
|
||||
* does not exceed the number of regions for the given table.
|
||||
*/
|
||||
@Test
|
||||
@SuppressWarnings("deprecation")
|
||||
public void shouldNumberOfReduceTaskNotExceedNumberOfRegionsForGivenTable()
|
||||
throws IOException {
|
||||
Assert.assertNotNull(presidentsTable);
|
||||
Configuration cfg = UTIL.getConfiguration();
|
||||
JobConf jobConf = new JobConf(cfg);
|
||||
TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
|
||||
TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
|
||||
TableMapReduceUtil.setScannerCaching(jobConf, 100);
|
||||
assertEquals(1, jobConf.getNumReduceTasks());
|
||||
assertEquals(100, jobConf.getInt("hbase.client.scanner.caching", 0));
|
||||
|
||||
jobConf.setNumReduceTasks(10);
|
||||
TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
|
||||
TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
|
||||
assertEquals(1, jobConf.getNumReduceTasks());
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("deprecation")
|
||||
public void shouldNumberOfMapTaskNotExceedNumberOfRegionsForGivenTable()
|
||||
throws IOException {
|
||||
Configuration cfg = UTIL.getConfiguration();
|
||||
JobConf jobConf = new JobConf(cfg);
|
||||
TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
|
||||
TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
|
||||
assertEquals(1, jobConf.getNumMapTasks());
|
||||
|
||||
jobConf.setNumMapTasks(10);
|
||||
TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
|
||||
TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
|
||||
assertEquals(1, jobConf.getNumMapTasks());
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("deprecation")
|
||||
public void shoudBeValidMapReduceEvaluation() throws Exception {
|
||||
Configuration cfg = UTIL.getConfiguration();
|
||||
JobConf jobConf = new JobConf(cfg);
|
||||
try {
|
||||
jobConf.setJobName("process row task");
|
||||
jobConf.setNumReduceTasks(1);
|
||||
TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
|
||||
ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
|
||||
jobConf);
|
||||
TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
|
||||
ClassificatorRowReduce.class, jobConf);
|
||||
RunningJob job = JobClient.runJob(jobConf);
|
||||
assertTrue(job.isSuccessful());
|
||||
} finally {
|
||||
if (jobConf != null)
|
||||
FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("deprecation")
|
||||
public void shoudBeValidMapReduceWithPartitionerEvaluation()
|
||||
throws IOException {
|
||||
Configuration cfg = UTIL.getConfiguration();
|
||||
JobConf jobConf = new JobConf(cfg);
|
||||
try {
|
||||
jobConf.setJobName("process row task");
|
||||
jobConf.setNumReduceTasks(2);
|
||||
TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
|
||||
ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
|
||||
jobConf);
|
||||
|
||||
TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
|
||||
ClassificatorRowReduce.class, jobConf, HRegionPartitioner.class);
|
||||
RunningJob job = JobClient.runJob(jobConf);
|
||||
assertTrue(job.isSuccessful());
|
||||
} finally {
|
||||
if (jobConf != null)
|
||||
FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
static class ClassificatorRowReduce extends MapReduceBase implements
|
||||
TableReduce<ImmutableBytesWritable, Put> {
|
||||
|
||||
@Override
|
||||
public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
|
||||
OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter)
|
||||
throws IOException {
|
||||
String strKey = Bytes.toString(key.get());
|
||||
List<Put> result = new ArrayList<Put>();
|
||||
while (values.hasNext())
|
||||
result.add(values.next());
|
||||
|
||||
if (relation.keySet().contains(strKey)) {
|
||||
Set<String> set = relation.get(strKey);
|
||||
if (set != null) {
|
||||
assertEquals(set.size(), result.size());
|
||||
} else {
|
||||
throwAccertionError("Test infrastructure error: set is null");
|
||||
}
|
||||
} else {
|
||||
throwAccertionError("Test infrastructure error: key not found in map");
|
||||
}
|
||||
}
|
||||
|
||||
private void throwAccertionError(String errorMessage) throws AssertionError {
|
||||
throw new AssertionError(errorMessage);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
static class ClassificatorMapper extends MapReduceBase implements
|
||||
TableMap<ImmutableBytesWritable, Put> {
|
||||
|
||||
@Override
|
||||
public void map(ImmutableBytesWritable row, Result result,
|
||||
OutputCollector<ImmutableBytesWritable, Put> outCollector,
|
||||
Reporter reporter) throws IOException {
|
||||
String rowKey = Bytes.toString(result.getRow());
|
||||
final ImmutableBytesWritable pKey = new ImmutableBytesWritable(
|
||||
Bytes.toBytes(PRESIDENT_PATTERN));
|
||||
final ImmutableBytesWritable aKey = new ImmutableBytesWritable(
|
||||
Bytes.toBytes(ACTOR_PATTERN));
|
||||
ImmutableBytesWritable outKey = null;
|
||||
|
||||
if (rowKey.startsWith(PRESIDENT_PATTERN)) {
|
||||
outKey = pKey;
|
||||
} else if (rowKey.startsWith(ACTOR_PATTERN)) {
|
||||
outKey = aKey;
|
||||
} else {
|
||||
throw new AssertionError("unexpected rowKey");
|
||||
}
|
||||
|
||||
String name = Bytes.toString(result.getValue(COLUMN_FAMILY,
|
||||
COLUMN_QUALIFIER));
|
||||
outCollector.collect(outKey, new Put(Bytes.toBytes("rowKey2")).add(
|
||||
COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(name)));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue