HBASE-8534 Fix coverage for org.apache.hadoop.hbase.mapreduce (Aleksey Gorshkov)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1488542 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2013-06-01 16:23:55 +00:00
parent ac54c557e9
commit 191ba57a27
12 changed files with 1151 additions and 81 deletions

View File

@ -0,0 +1,180 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.IndexBuilder.Map;
import org.apache.hadoop.hbase.mapreduce.SampleUploader.Uploader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@Category(LargeTests.class)
public class TestMapReduceExamples {
private static HBaseTestingUtility util = new HBaseTestingUtility();
/**
* Test SampleUploader from examples
*/
@SuppressWarnings("unchecked")
@Test
public void testSampleUploader() throws Exception {
Configuration configuration = new Configuration();
Uploader uploader = new Uploader();
Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context ctx = mock(Context.class);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
Put put = (Put) invocation.getArguments()[1];
assertEquals("row", Bytes.toString(writer.get()));
assertEquals("row", Bytes.toString(put.getRow()));
return null;
}
}).when(ctx).write(any(ImmutableBytesWritable.class), any(Put.class));
uploader.map(null, new Text("row,family,qualifier,value"), ctx);
Path dir = util.getDataTestDirOnTestFS("testSampleUploader");
String[] args = { dir.toString(), "simpleTable" };
Job job = SampleUploader.configureJob(configuration, args);
assertEquals(SequenceFileInputFormat.class, job.getInputFormatClass());
}
/**
* Test main method of SampleUploader.
*/
@Test
public void testMainSampleUploader() throws Exception {
PrintStream oldPrintStream = System.err;
SecurityManager SECURITY_MANAGER = System.getSecurityManager();
LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
System.setSecurityManager(newSecurityManager);
ByteArrayOutputStream data = new ByteArrayOutputStream();
String[] args = {};
System.setErr(new PrintStream(data));
try {
System.setErr(new PrintStream(data));
try {
SampleUploader.main(args);
fail("should be SecurityException");
} catch (SecurityException e) {
assertEquals(-1, newSecurityManager.getExitCode());
assertTrue(data.toString().contains("Wrong number of arguments:"));
assertTrue(data.toString().contains("Usage: SampleUploader <input> <tablename>"));
}
} finally {
System.setErr(oldPrintStream);
System.setSecurityManager(SECURITY_MANAGER);
}
}
/**
* Test IndexBuilder from examples
*/
@SuppressWarnings("unchecked")
@Test
public void testIndexBuilder() throws Exception {
Configuration configuration = new Configuration();
String[] args = { "tableName", "columnFamily", "column1", "column2" };
IndexBuilder.configureJob(configuration, args);
assertEquals("tableName", configuration.get("index.tablename"));
assertEquals("tableName", configuration.get(TableInputFormat.INPUT_TABLE));
assertEquals("column1,column2", configuration.get("index.fields"));
Map map = new Map();
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(Bytes.toBytes("test"));
Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put>.Context ctx =
mock(Context.class);
when(ctx.getConfiguration()).thenReturn(configuration);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
Put put = (Put) invocation.getArguments()[1];
assertEquals("tableName-column1", Bytes.toString(writer.get()));
assertEquals("test", Bytes.toString(put.getRow()));
return null;
}
}).when(ctx).write(any(ImmutableBytesWritable.class), any(Put.class));
Result result = mock(Result.class);
when(result.getValue(Bytes.toBytes("columnFamily"), Bytes.toBytes("column1"))).thenReturn(
Bytes.toBytes("test"));
map.setup(ctx);
map.map(rowKey, result, ctx);
}
/**
* Test main method of IndexBuilder
*/
@Test
public void testMainIndexBuilder() throws Exception {
PrintStream oldPrintStream = System.err;
SecurityManager SECURITY_MANAGER = System.getSecurityManager();
LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
System.setSecurityManager(newSecurityManager);
ByteArrayOutputStream data = new ByteArrayOutputStream();
String[] args = {};
System.setErr(new PrintStream(data));
try {
System.setErr(new PrintStream(data));
try {
IndexBuilder.main(args);
fail("should be SecurityException");
} catch (SecurityException e) {
assertEquals(-1, newSecurityManager.getExitCode());
assertTrue(data.toString().contains("arguments supplied, required: 3"));
assertTrue(data.toString().contains(
"Usage: IndexBuilder <TABLE_NAME> <COLUMN_FAMILY> <ATTR> [<ATTR> ...]"));
}
} finally {
System.setErr(oldPrintStream);
System.setSecurityManager(SECURITY_MANAGER);
}
}
}

View File

@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.*;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@Category(LargeTests.class)
public class TestCellCounter {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final byte[] ROW1 = Bytes.toBytes("row1");
private static final byte[] ROW2 = Bytes.toBytes("row2");
private static final String FAMILY_A_STRING = "a";
private static final String FAMILY_B_STRING = "b";
private static final byte[] FAMILY_A = Bytes.toBytes(FAMILY_A_STRING);
private static final byte[] FAMILY_B = Bytes.toBytes(FAMILY_B_STRING);
private static final byte[] QUALIFIER = Bytes.toBytes("q");
private static Path FQ_OUTPUT_DIR;
private static final String OUTPUT_DIR = "target" + File.separator + "test-data" + File.separator
+ "output";
private static long now = System.currentTimeMillis();
@BeforeClass
public static void beforeClass() throws Exception {
UTIL.startMiniCluster();
UTIL.startMiniMapReduceCluster();
FQ_OUTPUT_DIR = new Path(OUTPUT_DIR).makeQualified(new LocalFileSystem());
FileUtil.fullyDelete(new File(OUTPUT_DIR));
}
@AfterClass
public static void afterClass() throws Exception {
UTIL.shutdownMiniMapReduceCluster();
UTIL.shutdownMiniCluster();
}
/**
* Test CellCounter all data should print to output
*
*/
@Test
public void testCellCounter() throws Exception {
String sourceTable = "sourceTable";
byte[][] families = { FAMILY_A, FAMILY_B };
HTable t = UTIL.createTable(Bytes.toBytes(sourceTable), families);
try{
Put p = new Put(ROW1);
p.add(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11"));
p.add(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12"));
p.add(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13"));
t.put(p);
p = new Put(ROW2);
p.add(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21"));
p.add(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22"));
p.add(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23"));
t.put(p);
String[] args = { sourceTable, FQ_OUTPUT_DIR.toString(), ";", "^row1" };
runCount(args);
FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator +
"part-r-00000");
String data = IOUtils.toString(inputStream);
inputStream.close();
assertTrue(data.contains("Total Families Across all Rows" + "\t" + "2"));
assertTrue(data.contains("Total Qualifiers across all Rows" + "\t" + "2"));
assertTrue(data.contains("Total ROWS" + "\t" + "1"));
assertTrue(data.contains("b;q" + "\t" + "1"));
assertTrue(data.contains("a;q" + "\t" + "1"));
assertTrue(data.contains("row1;a;q_Versions" + "\t" + "1"));
assertTrue(data.contains("row1;b;q_Versions" + "\t" + "1"));
}finally{
t.close();
}
}
private boolean runCount(String[] args) throws IOException, InterruptedException,
ClassNotFoundException {
// need to make a copy of the configuration because to make sure
// different temp dirs are used.
GenericOptionsParser opts = new GenericOptionsParser(
new Configuration(UTIL.getConfiguration()), args);
Configuration configuration = opts.getConfiguration();
args = opts.getRemainingArgs();
Job job = CellCounter.createSubmittableJob(configuration, args);
job.waitForCompletion(false);
return job.isSuccessful();
}
/**
* Test main method of CellCounter
*/
@Test
public void testCellCounterMain() throws Exception {
PrintStream oldPrintStream = System.err;
SecurityManager SECURITY_MANAGER = System.getSecurityManager();
LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
System.setSecurityManager(newSecurityManager);
ByteArrayOutputStream data = new ByteArrayOutputStream();
String[] args = {};
System.setErr(new PrintStream(data));
try {
System.setErr(new PrintStream(data));
try {
CellCounter.main(args);
fail("should be SecurityException");
} catch (SecurityException e) {
assertEquals(-1, newSecurityManager.getExitCode());
assertTrue(data.toString().contains("ERROR: Wrong number of parameters:"));
// should be information about usage
assertTrue(data.toString().contains("Usage:"));
}
} finally {
System.setErr(oldPrintStream);
System.setSecurityManager(SECURITY_MANAGER);
}
}
}

View File

@ -17,9 +17,6 @@
*/
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
@ -28,10 +25,19 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.conf.Configuration;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import static org.junit.Assert.*;
/**
* Basic test for the CopyTable M/R tool
@ -40,6 +46,14 @@ import org.junit.experimental.categories.Category;
public class TestCopyTable {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static MiniHBaseCluster cluster;
private static final byte[] ROW1 = Bytes.toBytes("row1");
private static final byte[] ROW2 = Bytes.toBytes("row2");
private static final String FAMILY_A_STRING = "a";
private static final String FAMILY_B_STRING = "b";
private static final byte[] FAMILY_A = Bytes.toBytes(FAMILY_A_STRING);
private static final byte[] FAMILY_B = Bytes.toBytes(FAMILY_B_STRING);
private static final byte[] QUALIFIER = Bytes.toBytes("q");
@BeforeClass
public static void beforeClass() throws Exception {
@ -145,4 +159,102 @@ public class TestCopyTable {
TEST_UTIL.deleteTable(TABLENAME1);
TEST_UTIL.deleteTable(TABLENAME2);
}
/**
* Test copy of table from sourceTable to targetTable all rows from family a
*/
@Test
public void testRenameFamily() throws Exception {
String sourceTable = "sourceTable";
String targetTable = "targetTable";
byte[][] families = { FAMILY_A, FAMILY_B };
HTable t = TEST_UTIL.createTable(Bytes.toBytes(sourceTable), families);
HTable t2 = TEST_UTIL.createTable(Bytes.toBytes(targetTable), families);
Put p = new Put(ROW1);
p.add(FAMILY_A, QUALIFIER, Bytes.toBytes("Data11"));
p.add(FAMILY_B, QUALIFIER, Bytes.toBytes("Data12"));
p.add(FAMILY_A, QUALIFIER, Bytes.toBytes("Data13"));
t.put(p);
p = new Put(ROW2);
p.add(FAMILY_B, QUALIFIER, Bytes.toBytes("Dat21"));
p.add(FAMILY_A, QUALIFIER, Bytes.toBytes("Data22"));
p.add(FAMILY_B, QUALIFIER, Bytes.toBytes("Data23"));
t.put(p);
long currentTime = System.currentTimeMillis();
String[] args = new String[] { "--new.name=" + targetTable, "--families=a:b", "--all.cells",
"--starttime=" + (currentTime - 100000), "--endtime=" + (currentTime + 100000),
"--versions=1", sourceTable };
assertNull(t2.get(new Get(ROW1)).getRow());
clean();
assertTrue(runCopy(args));
assertNotNull(t2.get(new Get(ROW1)).getRow());
Result res = t2.get(new Get(ROW1));
byte[] b1 = res.getValue(FAMILY_B, QUALIFIER);
assertEquals("Data13", new String(b1));
assertNotNull(t2.get(new Get(ROW2)).getRow());
res = t2.get(new Get(ROW2));
b1 = res.getValue(FAMILY_A, QUALIFIER);
// Data from the family of B is not copied
assertNull(b1);
}
/**
* Test main method of CopyTable.
*/
@Test
public void testMainMethod() throws Exception {
String[] emptyArgs = { "-h" };
PrintStream oldWriter = System.err;
ByteArrayOutputStream data = new ByteArrayOutputStream();
PrintStream writer = new PrintStream(data);
System.setErr(writer);
SecurityManager SECURITY_MANAGER = System.getSecurityManager();
LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
System.setSecurityManager(newSecurityManager);
try {
CopyTable.main(emptyArgs);
fail("should be exit");
} catch (SecurityException e) {
assertEquals(1, newSecurityManager.getExitCode());
} finally {
System.setErr(oldWriter);
System.setSecurityManager(SECURITY_MANAGER);
}
assertTrue(data.toString().contains("rs.class"));
// should print usage information
assertTrue(data.toString().contains("Usage:"));
}
private boolean runCopy(String[] args) throws IOException, InterruptedException,
ClassNotFoundException {
GenericOptionsParser opts = new GenericOptionsParser(
new Configuration(TEST_UTIL.getConfiguration()), args);
Configuration configuration = opts.getConfiguration();
args = opts.getRemainingArgs();
clean();
Job job = CopyTable.createSubmittableJob(configuration, args);
job.waitForCompletion(false);
return job.isSuccessful();
}
private void clean() {
CopyTable.startTime = 0;
CopyTable.endTime = 0;
CopyTable.versions = -1;
CopyTable.tableName = null;
CopyTable.startRow = null;
CopyTable.stopRow = null;
CopyTable.newTableName = null;
CopyTable.peerAddress = null;
CopyTable.families = null;
CopyTable.allCells = false;
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.lang.reflect.InvocationTargetException;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestDriver {
/**
* Test main method of Driver class
*/
@Test
public void testDriver() throws Throwable {
PrintStream oldPrintStream = System.out;
SecurityManager SECURITY_MANAGER = System.getSecurityManager();
LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
System.setSecurityManager(newSecurityManager);
ByteArrayOutputStream data = new ByteArrayOutputStream();
String[] args = {};
System.setOut(new PrintStream(data));
try {
System.setOut(new PrintStream(data));
try {
Driver.main(args);
fail("should be SecurityException");
} catch (InvocationTargetException e) {
assertEquals(-1, newSecurityManager.getExitCode());
assertTrue(data.toString().contains(
"An example program must be given as the first argument."));
assertTrue(data.toString().contains("CellCounter: Count cells in HBase table"));
assertTrue(data.toString().contains("completebulkload: Complete a bulk data load."));
assertTrue(data.toString().contains(
"copytable: Export a table from local cluster to peer cluster"));
assertTrue(data.toString().contains("export: Write table data to HDFS."));
assertTrue(data.toString().contains("import: Import data written by Export."));
assertTrue(data.toString().contains("importtsv: Import data in TSV format."));
assertTrue(data.toString().contains("rowcounter: Count rows in HBase table"));
}
} finally {
System.setOut(oldPrintStream);
System.setSecurityManager(SECURITY_MANAGER);
}
}
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Mapper;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.mockito.Mockito.*;
@Category(SmallTests.class)
public class TestGroupingTableMapper {
/**
* Test GroupingTableMapper class
*/
@Test
public void testGroupingTableMapper() throws Exception {
GroupingTableMapper mapper = new GroupingTableMapper();
Configuration configuration = new Configuration();
configuration.set(GroupingTableMapper.GROUP_COLUMNS, "family1:clm family2:clm");
mapper.setConf(configuration);
Result result = mock(Result.class);
@SuppressWarnings("unchecked")
Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Result>.Context context =
mock(Mapper.Context.class);
context.write(any(ImmutableBytesWritable.class), any(Result.class));
List<KeyValue> keyValue = new ArrayList<KeyValue>();
byte[] row = {};
keyValue.add(new KeyValue(row, Bytes.toBytes("family2"), Bytes.toBytes("clm"), Bytes
.toBytes("value1")));
keyValue.add(new KeyValue(row, Bytes.toBytes("family1"), Bytes.toBytes("clm"), Bytes
.toBytes("value2")));
when(result.list()).thenReturn(keyValue);
mapper.map(null, result, context);
// template data
byte[][] data = { Bytes.toBytes("value1"), Bytes.toBytes("value2") };
ImmutableBytesWritable ibw = mapper.createGroupKey(data);
verify(context).write(ibw, result);
}
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
@Category(MediumTests.class)
public class TestHRegionPartitioner {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@BeforeClass
public static void beforeClass() throws Exception {
UTIL.startMiniCluster();
UTIL.startMiniMapReduceCluster();
}
@AfterClass
public static void afterClass() throws Exception {
UTIL.shutdownMiniMapReduceCluster();
UTIL.shutdownMiniCluster();
}
/**
* Test HRegionPartitioner
*/
@Test
public void testHRegionPartitioner() throws Exception {
byte[][] families = { Bytes.toBytes("familyA"), Bytes.toBytes("familyB") };
UTIL.createTable(Bytes.toBytes("out_table"), families, 1, Bytes.toBytes("aa"),
Bytes.toBytes("cc"), 3);
HRegionPartitioner<Long, Long> partitioner = new HRegionPartitioner<Long, Long>();
Configuration configuration = UTIL.getConfiguration();
configuration.set(TableOutputFormat.OUTPUT_TABLE, "out_table");
partitioner.setConf(configuration);
ImmutableBytesWritable writable = new ImmutableBytesWritable(Bytes.toBytes("bb"));
assertEquals(1, partitioner.getPartition(writable, 10L, 3));
assertEquals(0, partitioner.getPartition(writable, 10L, 1));
}
}

View File

@ -20,8 +20,13 @@ package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -39,9 +44,14 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.util.GenericOptionsParser;
import org.junit.After;
import org.junit.AfterClass;
@ -50,6 +60,10 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.mockito.Mockito.*;
/**
* Tests the table import and table export MR job functionality
@ -213,6 +227,7 @@ public class TestImportExport {
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
fs.delete(new Path(FQ_OUTPUT_DIR), true);
t.close();
}
@Test
@ -355,4 +370,120 @@ public class TestImportExport {
results.close();
return count;
}
/**
* test main method. Import should print help and call System.exit
*/
@Test
public void testImportMain() throws Exception {
PrintStream oldPrintStream = System.err;
SecurityManager SECURITY_MANAGER = System.getSecurityManager();
LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
System.setSecurityManager(newSecurityManager);
ByteArrayOutputStream data = new ByteArrayOutputStream();
String[] args = {};
System.setErr(new PrintStream(data));
try {
System.setErr(new PrintStream(data));
Import.main(args);
fail("should be SecurityException");
} catch (SecurityException e) {
assertEquals(-1, newSecurityManager.getExitCode());
assertTrue(data.toString().contains("Wrong number of arguments:"));
assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>"));
assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
assertTrue(data.toString().contains("-Dmapred.reduce.tasks.speculative.execution=false"));
} finally {
System.setErr(oldPrintStream);
System.setSecurityManager(SECURITY_MANAGER);
}
}
/**
* test main method. Export should print help and call System.exit
*/
@Test
public void testExportMain() throws Exception {
PrintStream oldPrintStream = System.err;
SecurityManager SECURITY_MANAGER = System.getSecurityManager();
LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
System.setSecurityManager(newSecurityManager);
ByteArrayOutputStream data = new ByteArrayOutputStream();
String[] args = {};
System.setErr(new PrintStream(data));
try {
System.setErr(new PrintStream(data));
Export.main(args);
fail("should be SecurityException");
} catch (SecurityException e) {
assertEquals(-1, newSecurityManager.getExitCode());
assertTrue(data.toString().contains("Wrong number of arguments:"));
assertTrue(data.toString().contains(
"Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
"[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]"));
assertTrue(data.toString().contains("-D hbase.mapreduce.scan.column.family=<familyName>"));
assertTrue(data.toString().contains("-D hbase.mapreduce.include.deleted.rows=true"));
assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100"));
assertTrue(data.toString().contains("-Dmapred.map.tasks.speculative.execution=false"));
assertTrue(data.toString().contains("-Dmapred.reduce.tasks.speculative.execution=false"));
assertTrue(data.toString().contains("-Dhbase.export.scanner.batch=10"));
} finally {
System.setErr(oldPrintStream);
System.setSecurityManager(SECURITY_MANAGER);
}
}
/**
* Test map method of Importer
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testKeyValueImporter() throws Exception {
KeyValueImporter importer = new KeyValueImporter();
Configuration configuration = new Configuration();
Context ctx = mock(Context.class);
when(ctx.getConfiguration()).thenReturn(configuration);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
KeyValue key = (KeyValue) invocation.getArguments()[1];
assertEquals("Key", Bytes.toString(writer.get()));
assertEquals("row", Bytes.toString(key.getRow()));
return null;
}
}).when(ctx).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
importer.setup(ctx);
Result value = mock(Result.class);
KeyValue[] keys = {
new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
Bytes.toBytes("value")),
new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
Bytes.toBytes("value1")) };
when(value.raw()).thenReturn(keys);
importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx);
}
/**
* Test addFilterAndArguments method of Import This method set couple
* parameters into Configuration
*/
@Test
public void testAddFilterAndArguments() {
Configuration configuration = new Configuration();
List<String> args = new ArrayList<String>();
args.add("param1");
args.add("param2");
Import.addFilterAndArguments(configuration, FilterBase.class, args);
assertEquals("org.apache.hadoop.hbase.filter.FilterBase",
configuration.get(Import.FILTER_CLASS_CONF_KEY));
assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
}
}

View File

@ -18,22 +18,12 @@
*/
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Collection;
import java.util.TreeMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
@ -41,9 +31,16 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.*;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.TreeMap;
import static org.junit.Assert.*;
/**
* Test cases for the "load" half of the HFileOutputFormat bulk load
* functionality. These tests run faster than the full MR cluster
@ -146,17 +143,15 @@ public class TestLoadIncrementalHFiles {
final byte[] TABLE = Bytes.toBytes("mytable_"+testName);
HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
HTableDescriptor htd = new HTableDescriptor(TABLE);
HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
familyDesc.setBloomFilterType(bloomType);
htd.addFamily(familyDesc);
admin.createTable(htd, SPLIT_KEYS);
HTable table = new HTable(util.getConfiguration(), TABLE);
util.waitTableEnabled(TABLE);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration(), useSecure);
loader.doBulkLoad(dir, table);
String [] args= {dir.toString(),"mytable_"+testName};
loader.run(args);
HTable table = new HTable(util.getConfiguration(), TABLE);
assertEquals(expectedRows, util.countRows(table));
}
@ -167,7 +162,7 @@ public class TestLoadIncrementalHFiles {
@Test
public void testNonexistentColumnFamilyLoad() throws Exception {
String testName = "testNonexistentColumnFamilyLoad";
byte[][][] hfileRanges = new byte[][][] {
byte[][][] hFileRanges = new byte[][][] {
new byte[][]{ Bytes.toBytes("aaa"), Bytes.toBytes("ccc") },
new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
};
@ -177,12 +172,12 @@ public class TestLoadIncrementalHFiles {
dir = dir.makeQualified(fs);
Path familyDir = new Path(dir, Bytes.toString(FAMILY));
int hfileIdx = 0;
for (byte[][] range : hfileRanges) {
int hFileIdx = 0;
for (byte[][] range : hFileRanges) {
byte[] from = range[0];
byte[] to = range[1];
createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+ hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
+ hFileIdx++), FAMILY, QUALIFIER, from, to, 1000);
}
final byte[] TABLE = Bytes.toBytes("mytable_"+testName);
@ -214,55 +209,6 @@ public class TestLoadIncrementalHFiles {
admin.close();
}
private void verifyAssignedSequenceNumber(String testName,
byte[][][] hfileRanges, boolean nonZero) throws Exception {
Path dir = util.getDataTestDir(testName);
FileSystem fs = util.getTestFileSystem();
dir = dir.makeQualified(fs);
Path familyDir = new Path(dir, Bytes.toString(FAMILY));
int hfileIdx = 0;
for (byte[][] range : hfileRanges) {
byte[] from = range[0];
byte[] to = range[1];
createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+ hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
}
final byte[] TABLE = Bytes.toBytes("mytable_"+testName);
HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
HTableDescriptor htd = new HTableDescriptor(TABLE);
HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
htd.addFamily(familyDesc);
admin.createTable(htd, SPLIT_KEYS);
HTable table = new HTable(util.getConfiguration(), TABLE);
util.waitTableEnabled(TABLE);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
util.getConfiguration());
// Do a dummy put to increase the hlog sequence number
Put put = new Put(Bytes.toBytes("row"));
put.add(FAMILY, QUALIFIER, Bytes.toBytes("value"));
table.put(put);
loader.doBulkLoad(dir, table);
// Get the store files
Collection<StoreFile> files = util.getHBaseCluster().
getRegions(TABLE).get(0).getStore(FAMILY).getStorefiles();
for (StoreFile file: files) {
// the sequenceId gets initialized during createReader
file.createReader();
if (nonZero)
assertTrue(file.getMaxSequenceId() > 0);
else
assertTrue(file.getMaxSequenceId() == -1);
}
}
@Test
public void testSplitStoreFile() throws IOException {
Path dir = util.getDataTestDirOnTestFS("testSplitHFile");
@ -287,9 +233,9 @@ public class TestLoadIncrementalHFiles {
}
private int verifyHFile(Path p) throws IOException {
Configuration conf = util.getConfiguration();
Configuration configuration = util.getConfiguration();
HFile.Reader reader = HFile.createReader(
p.getFileSystem(conf), p, new CacheConfig(conf));
p.getFileSystem(configuration), p, new CacheConfig(configuration));
reader.loadFileInfo();
HFileScanner scanner = reader.getScanner(false, false);
scanner.seekTo();
@ -309,12 +255,12 @@ public class TestLoadIncrementalHFiles {
* TODO put me in an HFileTestUtil or something?
*/
static void createHFile(
Configuration conf,
Configuration configuration,
FileSystem fs, Path path,
byte[] family, byte[] qualifier,
byte[] startKey, byte[] endKey, int numRows) throws IOException
{
HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf))
HFile.Writer writer = HFile.getWriterFactory(configuration, new CacheConfig(configuration))
.withPath(fs, path)
.withBlockSize(BLOCKSIZE)
.withCompression(COMPRESSION)
@ -335,10 +281,10 @@ public class TestLoadIncrementalHFiles {
}
private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
Integer value = map.containsKey(first)?(Integer)map.get(first):0;
Integer value = map.containsKey(first)?map.get(first):0;
map.put(first, value+1);
value = map.containsKey(last)?(Integer)map.get(last):0;
value = map.containsKey(last)?map.get(last):0;
map.put(last, value-1);
}

View File

@ -20,8 +20,11 @@ package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
@ -33,6 +36,7 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.RowCounter.RowCounterMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
@ -170,4 +174,56 @@ public class TestRowCounter {
}
table.put(rowsUpdate);
}
/**
* test main method. Import should print help and call System.exit
*/
@Test
public void testImportMain() throws Exception {
PrintStream oldPrintStream = System.err;
SecurityManager SECURITY_MANAGER = System.getSecurityManager();
LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
System.setSecurityManager(newSecurityManager);
ByteArrayOutputStream data = new ByteArrayOutputStream();
String[] args = {};
System.setErr(new PrintStream(data));
try {
System.setErr(new PrintStream(data));
try {
RowCounter.main(args);
fail("should be SecurityException");
} catch (SecurityException e) {
assertEquals(-1, newSecurityManager.getExitCode());
assertTrue(data.toString().contains("Wrong number of parameters:"));
assertTrue(data.toString().contains(
"Usage: RowCounter [options] <tablename> [--range=[startKey],[endKey]] " +
"[<column1> <column2>...]"));
assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100"));
assertTrue(data.toString().contains("-Dmapred.map.tasks.speculative.execution=false"));
}
data.reset();
try {
args = new String[2];
args[0] = "table";
args[1] = "--range=1";
RowCounter.main(args);
fail("should be SecurityException");
} catch (SecurityException e) {
assertEquals(-1, newSecurityManager.getExitCode());
assertTrue(data.toString().contains(
"Please specify range in such format as \"--range=a,b\" or, with only one boundary," +
" \"--range=,b\" or \"--range=a,\""));
assertTrue(data.toString().contains(
"Usage: RowCounter [options] <tablename> [--range=[startKey],[endKey]] " +
"[<column1> <column2>...]"));
}
} finally {
System.setErr(oldPrintStream);
System.setSecurityManager(SECURITY_MANAGER);
}
}
}

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.*;
/**
* Test class TableMapReduceUtil
*/
@Category(LargeTests.class)
public class TestTableMapReduceUtil {
/**
* Test different variants ofinitTableMapperJob method
*/
@Test
public void testInitTableMapperJob() throws Exception {
Configuration configuration = new Configuration();
Job job = new Job(configuration, "tableName");
// test
TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class,
Text.class, job, false, HLogInputFormat.class);
assertEquals(HLogInputFormat.class, job.getInputFormatClass());
assertEquals(Import.Importer.class, job.getMapperClass());
assertEquals(LongWritable.class, job.getOutputKeyClass());
assertEquals(Text.class, job.getOutputValueClass());
assertNull(job.getCombinerClass());
assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
assertEquals(
"org.apache.hadoop.io.serializer.WritableSerialization," +
"org.apache.hadoop.hbase.mapreduce.MutationSerialization," +
"org.apache.hadoop.hbase.mapreduce.ResultSerialization," +
"org.apache.hadoop.hbase.mapreduce.KeyValueSerialization",
job.getConfiguration().get("io.serializations"));
configuration = new Configuration();
job = new Job(configuration, "tableName");
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
Import.Importer.class, Text.class, Text.class, job, false, HLogInputFormat.class);
assertEquals(HLogInputFormat.class, job.getInputFormatClass());
assertEquals(Import.Importer.class, job.getMapperClass());
assertEquals(LongWritable.class, job.getOutputKeyClass());
assertEquals(Text.class, job.getOutputValueClass());
assertNull(job.getCombinerClass());
assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
assertEquals(
"org.apache.hadoop.io.serializer.WritableSerialization," +
"org.apache.hadoop.hbase.mapreduce.MutationSerialization," +
"org.apache.hadoop.hbase.mapreduce.ResultSerialization," +
"org.apache.hadoop.hbase.mapreduce.KeyValueSerialization",
job.getConfiguration().get("io.serializations"));
configuration = new Configuration();
job = new Job(configuration, "tableName");
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
Import.Importer.class, Text.class, Text.class, job);
assertEquals(TableInputFormat.class, job.getInputFormatClass());
assertEquals(Import.Importer.class, job.getMapperClass());
assertEquals(LongWritable.class, job.getOutputKeyClass());
assertEquals(Text.class, job.getOutputValueClass());
assertNull(job.getCombinerClass());
assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
assertEquals(
"org.apache.hadoop.io.serializer.WritableSerialization," +
"org.apache.hadoop.hbase.mapreduce.MutationSerialization," +
"org.apache.hadoop.hbase.mapreduce.ResultSerialization," +
"org.apache.hadoop.hbase.mapreduce.KeyValueSerialization",
job.getConfiguration().get("io.serializations"));
configuration = new Configuration();
job = new Job(configuration, "tableName");
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
Import.Importer.class, Text.class, Text.class, job, false);
assertEquals(TableInputFormat.class, job.getInputFormatClass());
assertEquals(Import.Importer.class, job.getMapperClass());
assertEquals(LongWritable.class, job.getOutputKeyClass());
assertEquals(Text.class, job.getOutputValueClass());
assertNull(job.getCombinerClass());
assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
assertEquals(
"org.apache.hadoop.io.serializer.WritableSerialization," +
"org.apache.hadoop.hbase.mapreduce.MutationSerialization," +
"org.apache.hadoop.hbase.mapreduce.ResultSerialization," +
"org.apache.hadoop.hbase.mapreduce.KeyValueSerialization",
job.getConfiguration().get("io.serializations"));
}
}

View File

@ -19,12 +19,18 @@ package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.Delete;
@ -32,12 +38,24 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.WALPlayer.HLogKeyValueMapper;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
/**
* Basic test for the WALPlayer M/R tool
@ -90,14 +108,94 @@ public class TestWALPlayer {
String walInputDir = new Path(cluster.getMaster().getMasterFileSystem()
.getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
WALPlayer player = new WALPlayer(TEST_UTIL.getConfiguration());
Configuration configuration= TEST_UTIL.getConfiguration();
WALPlayer player = new WALPlayer(configuration);
String optionName="_test_.name";
configuration.set(optionName, "1000");
player.setupTime(configuration, optionName);
assertEquals(1000,configuration.getLong(optionName,0));
assertEquals(0, player.run(new String[] { walInputDir, Bytes.toString(TABLENAME1),
Bytes.toString(TABLENAME2) }));
// verify the WAL was player into table 2
Get g = new Get(ROW);
Result r = t2.get(g);
assertEquals(1, r.size());
assertTrue(Bytes.equals(COLUMN2, r.raw()[0].getQualifier()));
}
/**
* Test HLogKeyValueMapper setup and map
*/
@Test
public void testHLogKeyValueMapper() throws Exception {
Configuration configuration = new Configuration();
configuration.set(WALPlayer.TABLES_KEY, "table");
HLogKeyValueMapper mapper = new HLogKeyValueMapper();
HLogKey key = mock(HLogKey.class);
when(key.getTablename()).thenReturn(Bytes.toBytes("table"));
@SuppressWarnings("unchecked")
Mapper<HLogKey, WALEdit, ImmutableBytesWritable, KeyValue>.Context context =
mock(Context.class);
when(context.getConfiguration()).thenReturn(configuration);
WALEdit value = mock(WALEdit.class);
List<KeyValue> values = new ArrayList<KeyValue>();
KeyValue kv1 = mock(KeyValue.class);
when(kv1.getFamily()).thenReturn(Bytes.toBytes("family"));
when(kv1.getRow()).thenReturn(Bytes.toBytes("row"));
values.add(kv1);
when(value.getKeyValues()).thenReturn(values);
mapper.setup(context);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
KeyValue key = (KeyValue) invocation.getArguments()[1];
assertEquals("row", Bytes.toString(writer.get()));
assertEquals("row", Bytes.toString(key.getRow()));
return null;
}
}).when(context).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
mapper.map(key, value, context);
}
/**
* Test main method
*/
@Test
public void testMainMethod() throws Exception {
PrintStream oldPrintStream = System.err;
SecurityManager SECURITY_MANAGER = System.getSecurityManager();
LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
System.setSecurityManager(newSecurityManager);
ByteArrayOutputStream data = new ByteArrayOutputStream();
String[] args = {};
System.setErr(new PrintStream(data));
try {
System.setErr(new PrintStream(data));
try {
WALPlayer.main(args);
fail("should be SecurityException");
} catch (SecurityException e) {
assertEquals(-1, newSecurityManager.getExitCode());
assertTrue(data.toString().contains("ERROR: Wrong number of arguments:"));
assertTrue(data.toString().contains("Usage: WALPlayer [options] <wal inputdir>" +
" <tables> [<tableMappings>]"));
assertTrue(data.toString().contains("-Dhlog.bulk.output=/path/for/output"));
}
} finally {
System.setErr(oldPrintStream);
System.setSecurityManager(SECURITY_MANAGER);
}
}
}

View File

@ -0,0 +1,75 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.security.Permission;
/**
* class for masquerading System.exit(int).
* Use for test main method with System.exit(int )
* usage:
* new LauncherSecurityManager();
* try {
* CellCounter.main(args);
* fail("should be exception");
* } catch (SecurityException e) {
* assert(.,e.getExitCode());
* }
*/
public class LauncherSecurityManager extends SecurityManager {
private int exitCode;
private SecurityManager securityManager;
public LauncherSecurityManager() {
reset();
}
@Override
public void checkPermission(Permission perm, Object context) {
if (securityManager != null) {
// check everything with the original SecurityManager
securityManager.checkPermission(perm, context);
}
}
@Override
public void checkPermission(Permission perm) {
if (securityManager != null) {
// check everything with the original SecurityManager
securityManager.checkPermission(perm);
}
}
@Override
public void checkExit(int status) throws SecurityException {
exitCode = status;
throw new SecurityException("Intercepted System.exit(" + status + ")");
}
public int getExitCode() {
return exitCode;
}
public void reset() {
exitCode = 0;
}
}