HBASE-9165 [mapreduce] Modularize building dependency jars
- Separate adding HBase and dependencies from adding other job dependencies, and expose it as a separate method that other projects can use (for PIG-3285). - Explicitly add hbase-server to the list of dependencies we ship with the job, for users who extend the classes we provide (see HBASE-9112). - Add integration test for addDependencyJars. - Code reuse for TestTableMapReduce. git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1542341 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
aecabe7518
commit
4b87240aa3
|
@ -0,0 +1,113 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configurable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.IntegrationTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.IntegrationTests;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.util.Tool;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that we add tmpjars correctly including the named dependencies. Runs
|
||||||
|
* as an integration test so that classpath is realistic.
|
||||||
|
*/
|
||||||
|
@Category(IntegrationTests.class)
|
||||||
|
public class IntegrationTestTableMapReduceUtil implements Configurable, Tool {
|
||||||
|
|
||||||
|
private static IntegrationTestingUtility util;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void provisionCluster() throws Exception {
|
||||||
|
if (null == util) {
|
||||||
|
util = new IntegrationTestingUtility();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void skipMiniCluster() {
|
||||||
|
// test probably also works with a local cluster, but
|
||||||
|
// IntegrationTestingUtility doesn't support this concept.
|
||||||
|
assumeTrue("test requires a distributed cluster.", util.isDistributedCluster());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Look for jars we expect to be on the classpath by name.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testAddDependencyJars() throws Exception {
|
||||||
|
Job job = new Job();
|
||||||
|
TableMapReduceUtil.addDependencyJars(job);
|
||||||
|
String tmpjars = job.getConfiguration().get("tmpjars");
|
||||||
|
|
||||||
|
// verify presence of modules
|
||||||
|
assertTrue(tmpjars.contains("hbase-common"));
|
||||||
|
assertTrue(tmpjars.contains("hbase-protocol"));
|
||||||
|
assertTrue(tmpjars.contains("hbase-client"));
|
||||||
|
assertTrue(tmpjars.contains("hbase-hadoop-compat"));
|
||||||
|
assertTrue(tmpjars.contains("hbase-server"));
|
||||||
|
|
||||||
|
// verify presence of 3rd party dependencies.
|
||||||
|
assertTrue(tmpjars.contains("zookeeper"));
|
||||||
|
assertTrue(tmpjars.contains("netty"));
|
||||||
|
assertTrue(tmpjars.contains("protobuf"));
|
||||||
|
assertTrue(tmpjars.contains("guava"));
|
||||||
|
assertTrue(tmpjars.contains("htrace"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int run(String[] args) throws Exception {
|
||||||
|
provisionCluster();
|
||||||
|
skipMiniCluster();
|
||||||
|
testAddDependencyJars();
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setConf(Configuration conf) {
|
||||||
|
if (util != null) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"setConf not supported after the test has been initialized.");
|
||||||
|
}
|
||||||
|
util = new IntegrationTestingUtility(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration getConf() {
|
||||||
|
return util.getConfiguration();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
IntegrationTestingUtility.setUseDistributedCluster(conf);
|
||||||
|
int status = ToolRunner.run(conf, new IntegrationTestTableMapReduceUtil(), args);
|
||||||
|
System.exit(status);
|
||||||
|
}
|
||||||
|
}
|
|
@ -37,12 +37,11 @@ import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapred.FileInputFormat;
|
import org.apache.hadoop.mapred.FileInputFormat;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
|
||||||
import org.apache.hadoop.mapred.InputFormat;
|
import org.apache.hadoop.mapred.InputFormat;
|
||||||
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapred.OutputFormat;
|
import org.apache.hadoop.mapred.OutputFormat;
|
||||||
import org.apache.hadoop.mapred.TextInputFormat;
|
import org.apache.hadoop.mapred.TextInputFormat;
|
||||||
import org.apache.hadoop.mapred.TextOutputFormat;
|
import org.apache.hadoop.mapred.TextOutputFormat;
|
||||||
import org.apache.hadoop.mapred.jobcontrol.Job;
|
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
|
@ -52,7 +51,7 @@ import org.apache.zookeeper.KeeperException;
|
||||||
@Deprecated
|
@Deprecated
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Stable
|
@InterfaceStability.Stable
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||||
public class TableMapReduceUtil {
|
public class TableMapReduceUtil {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -297,15 +296,14 @@ public class TableMapReduceUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(Job)
|
* @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)
|
||||||
*/
|
*/
|
||||||
public static void addDependencyJars(JobConf job) throws IOException {
|
public static void addDependencyJars(JobConf job) throws IOException {
|
||||||
|
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
|
||||||
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(
|
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(
|
||||||
job,
|
job,
|
||||||
org.apache.zookeeper.ZooKeeper.class,
|
// when making changes here, consider also mapreduce.TableMapReduceUtil
|
||||||
org.jboss.netty.channel.ChannelFactory.class,
|
// pull job classes
|
||||||
com.google.common.base.Function.class,
|
|
||||||
com.google.protobuf.Message.class,
|
|
||||||
job.getMapOutputKeyClass(),
|
job.getMapOutputKeyClass(),
|
||||||
job.getMapOutputValueClass(),
|
job.getMapOutputValueClass(),
|
||||||
job.getOutputKeyClass(),
|
job.getOutputKeyClass(),
|
||||||
|
|
|
@ -45,7 +45,7 @@ extends TableMapper<ImmutableBytesWritable, Result> {
|
||||||
* @param job The job configuration.
|
* @param job The job configuration.
|
||||||
* @throws IOException When setting up the job fails.
|
* @throws IOException When setting up the job fails.
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("rawtypes")
|
||||||
public static void initJob(String table, Scan scan,
|
public static void initJob(String table, Scan scan,
|
||||||
Class<? extends TableMapper> mapper, Job job) throws IOException {
|
Class<? extends TableMapper> mapper, Job job) throws IOException {
|
||||||
TableMapReduceUtil.initTableMapperJob(table, scan, mapper,
|
TableMapReduceUtil.initTableMapperJob(table, scan, mapper,
|
||||||
|
|
|
@ -71,7 +71,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
/**
|
/**
|
||||||
* Utility for {@link TableMapper} and {@link TableReducer}
|
* Utility for {@link TableMapper} and {@link TableReducer}
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Stable
|
@InterfaceStability.Stable
|
||||||
public class TableMapReduceUtil {
|
public class TableMapReduceUtil {
|
||||||
|
@ -557,25 +557,45 @@ public class TableMapReduceUtil {
|
||||||
job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
|
job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add HBase and its dependencies (only) to the job configuration.
|
||||||
|
* <p>
|
||||||
|
* This is intended as a low-level API, facilitating code reuse between this
|
||||||
|
* class and its mapred counterpart. It also of use to extenral tools that
|
||||||
|
* need to build a MapReduce job that interacts with HBase but want
|
||||||
|
* fine-grained control over the jars shipped to the cluster.
|
||||||
|
* </p>
|
||||||
|
* @param conf The Configuration object to extend with dependencies.
|
||||||
|
* @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil
|
||||||
|
* @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a>
|
||||||
|
*/
|
||||||
|
public static void addHBaseDependencyJars(Configuration conf) throws IOException {
|
||||||
|
addDependencyJars(conf,
|
||||||
|
// explicitly pull a class from each module
|
||||||
|
org.apache.hadoop.hbase.HConstants.class, // hbase-common
|
||||||
|
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class, // hbase-protocol
|
||||||
|
org.apache.hadoop.hbase.client.Put.class, // hbase-client
|
||||||
|
org.apache.hadoop.hbase.CompatibilityFactory.class, // hbase-hadoop-compat
|
||||||
|
org.apache.hadoop.hbase.mapreduce.TableMapper.class, // hbase-server
|
||||||
|
// pull necessary dependencies
|
||||||
|
org.apache.zookeeper.ZooKeeper.class,
|
||||||
|
org.jboss.netty.channel.ChannelFactory.class,
|
||||||
|
com.google.protobuf.Message.class,
|
||||||
|
com.google.common.collect.Lists.class,
|
||||||
|
org.cloudera.htrace.Trace.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add the HBase dependency jars as well as jars for any of the configured
|
* Add the HBase dependency jars as well as jars for any of the configured
|
||||||
* job classes to the job configuration, so that JobClient will ship them
|
* job classes to the job configuration, so that JobClient will ship them
|
||||||
* to the cluster and add them to the DistributedCache.
|
* to the cluster and add them to the DistributedCache.
|
||||||
*/
|
*/
|
||||||
public static void addDependencyJars(Job job) throws IOException {
|
public static void addDependencyJars(Job job) throws IOException {
|
||||||
|
addHBaseDependencyJars(job.getConfiguration());
|
||||||
try {
|
try {
|
||||||
addDependencyJars(job.getConfiguration(),
|
addDependencyJars(job.getConfiguration(),
|
||||||
// explicitly pull a class from each module
|
// when making changes here, consider also mapred.TableMapReduceUtil
|
||||||
org.apache.hadoop.hbase.HConstants.class, // hbase-common
|
|
||||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class, // hbase-protocol
|
|
||||||
org.apache.hadoop.hbase.client.Put.class, // hbase-client
|
|
||||||
org.apache.hadoop.hbase.CompatibilityFactory.class, // hbase-hadoop-compat
|
|
||||||
// pull necessary dependencies
|
|
||||||
org.apache.zookeeper.ZooKeeper.class,
|
|
||||||
org.jboss.netty.channel.ChannelFactory.class,
|
|
||||||
com.google.protobuf.Message.class,
|
|
||||||
com.google.common.collect.Lists.class,
|
|
||||||
org.cloudera.htrace.Trace.class,
|
|
||||||
// pull job classes
|
// pull job classes
|
||||||
job.getMapOutputKeyClass(),
|
job.getMapOutputKeyClass(),
|
||||||
job.getMapOutputValueClass(),
|
job.getMapOutputValueClass(),
|
||||||
|
|
|
@ -18,23 +18,20 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.mapred;
|
package org.apache.hadoop.hbase.mapred;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.NavigableMap;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.hbase.*;
|
import org.apache.hadoop.hbase.LargeTests;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
|
import org.apache.hadoop.hbase.mapreduce.TestTableMapReduceBase;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.mapred.JobClient;
|
import org.apache.hadoop.mapred.JobClient;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
|
@ -42,101 +39,40 @@ import org.apache.hadoop.mapred.MapReduceBase;
|
||||||
import org.apache.hadoop.mapred.OutputCollector;
|
import org.apache.hadoop.mapred.OutputCollector;
|
||||||
import org.apache.hadoop.mapred.Reporter;
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
import org.apache.hadoop.mapred.RunningJob;
|
import org.apache.hadoop.mapred.RunningJob;
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test Map/Reduce job over HBase tables. The map/reduce process we're testing
|
* Test Map/Reduce job over HBase tables. The map/reduce process we're testing
|
||||||
* on our tables is simple - take every row in the table, reverse the value of
|
* on our tables is simple - take every row in the table, reverse the value of
|
||||||
* a particular cell, and write it back to the table.
|
* a particular cell, and write it back to the table.
|
||||||
*/
|
*/
|
||||||
@Category(LargeTests.class)
|
@Category(LargeTests.class)
|
||||||
public class TestTableMapReduce {
|
@SuppressWarnings("deprecation")
|
||||||
|
public class TestTableMapReduce extends TestTableMapReduceBase {
|
||||||
private static final Log LOG =
|
private static final Log LOG =
|
||||||
LogFactory.getLog(TestTableMapReduce.class.getName());
|
LogFactory.getLog(TestTableMapReduce.class.getName());
|
||||||
private static final HBaseTestingUtility UTIL =
|
|
||||||
new HBaseTestingUtility();
|
|
||||||
static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest");
|
|
||||||
static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
|
|
||||||
static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
|
|
||||||
|
|
||||||
private static final byte [][] columns = new byte [][] {
|
protected Log getLog() { return LOG; }
|
||||||
INPUT_FAMILY,
|
|
||||||
OUTPUT_FAMILY
|
|
||||||
};
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void beforeClass() throws Exception {
|
|
||||||
UTIL.startMiniCluster();
|
|
||||||
HTable table = UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] {INPUT_FAMILY, OUTPUT_FAMILY});
|
|
||||||
UTIL.createMultiRegions(table, INPUT_FAMILY);
|
|
||||||
UTIL.loadTable(table, INPUT_FAMILY);
|
|
||||||
UTIL.startMiniMapReduceCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void afterClass() throws Exception {
|
|
||||||
UTIL.shutdownMiniMapReduceCluster();
|
|
||||||
UTIL.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pass the given key and processed record reduce
|
* Pass the given key and processed record reduce
|
||||||
*/
|
*/
|
||||||
public static class ProcessContentsMapper
|
static class ProcessContentsMapper extends MapReduceBase implements
|
||||||
extends MapReduceBase
|
TableMap<ImmutableBytesWritable, Put> {
|
||||||
implements TableMap<ImmutableBytesWritable, Put> {
|
|
||||||
/**
|
/**
|
||||||
* Pass the key, and reversed value to reduce
|
* Pass the key, and reversed value to reduce
|
||||||
* @param key
|
|
||||||
* @param value
|
|
||||||
* @param output
|
|
||||||
* @param reporter
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
public void map(ImmutableBytesWritable key, Result value,
|
public void map(ImmutableBytesWritable key, Result value,
|
||||||
OutputCollector<ImmutableBytesWritable, Put> output,
|
OutputCollector<ImmutableBytesWritable, Put> output,
|
||||||
Reporter reporter)
|
Reporter reporter)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (value.size() != 1) {
|
output.collect(key, TestTableMapReduceBase.map(key, value));
|
||||||
throw new IOException("There should only be one input column");
|
|
||||||
}
|
|
||||||
Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
|
|
||||||
cf = value.getMap();
|
|
||||||
if(!cf.containsKey(INPUT_FAMILY)) {
|
|
||||||
throw new IOException("Wrong input columns. Missing: '" +
|
|
||||||
Bytes.toString(INPUT_FAMILY) + "'.");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get the original value and reverse it
|
|
||||||
|
|
||||||
String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
|
|
||||||
StringBuilder newValue = new StringBuilder(originalValue);
|
|
||||||
newValue.reverse();
|
|
||||||
|
|
||||||
// Now set the value to be collected
|
|
||||||
|
|
||||||
Put outval = new Put(key.get());
|
|
||||||
outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
|
|
||||||
output.collect(key, outval);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Test a map/reduce against a multi-region table
|
protected void runTestOnTable(HTable table) throws IOException {
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testMultiRegionTable() throws IOException {
|
|
||||||
runTestOnTable(new HTable(UTIL.getConfiguration(), MULTI_REGION_TABLE_NAME));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void runTestOnTable(HTable table) throws IOException {
|
|
||||||
JobConf jobConf = null;
|
JobConf jobConf = null;
|
||||||
try {
|
try {
|
||||||
LOG.info("Before map/reduce startup");
|
LOG.info("Before map/reduce startup");
|
||||||
|
@ -162,102 +98,5 @@ public class TestTableMapReduce {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verify(String tableName) throws IOException {
|
|
||||||
HTable table = new HTable(UTIL.getConfiguration(), tableName);
|
|
||||||
boolean verified = false;
|
|
||||||
long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
|
|
||||||
int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
|
|
||||||
for (int i = 0; i < numRetries; i++) {
|
|
||||||
try {
|
|
||||||
LOG.info("Verification attempt #" + i);
|
|
||||||
verifyAttempt(table);
|
|
||||||
verified = true;
|
|
||||||
break;
|
|
||||||
} catch (NullPointerException e) {
|
|
||||||
// If here, a cell was empty. Presume its because updates came in
|
|
||||||
// after the scanner had been opened. Wait a while and retry.
|
|
||||||
LOG.debug("Verification attempt failed: " + e.getMessage());
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
Thread.sleep(pause);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
// continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assertTrue(verified);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Looks at every value of the mapreduce output and verifies that indeed
|
|
||||||
* the values have been reversed.
|
|
||||||
* @param table Table to scan.
|
|
||||||
* @throws IOException
|
|
||||||
* @throws NullPointerException if we failed to find a cell value
|
|
||||||
*/
|
|
||||||
private void verifyAttempt(final HTable table) throws IOException, NullPointerException {
|
|
||||||
Scan scan = new Scan();
|
|
||||||
TableInputFormat.addColumns(scan, columns);
|
|
||||||
ResultScanner scanner = table.getScanner(scan);
|
|
||||||
try {
|
|
||||||
Iterator<Result> itr = scanner.iterator();
|
|
||||||
assertTrue(itr.hasNext());
|
|
||||||
while(itr.hasNext()) {
|
|
||||||
Result r = itr.next();
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
if (r.size() > 2 ) {
|
|
||||||
throw new IOException("Too many results, expected 2 got " +
|
|
||||||
r.size());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
byte[] firstValue = null;
|
|
||||||
byte[] secondValue = null;
|
|
||||||
int count = 0;
|
|
||||||
for(Cell kv : r.listCells()) {
|
|
||||||
if (count == 0) {
|
|
||||||
firstValue = CellUtil.cloneValue(kv);
|
|
||||||
}
|
|
||||||
if (count == 1) {
|
|
||||||
secondValue = CellUtil.cloneValue(kv);;
|
|
||||||
}
|
|
||||||
count++;
|
|
||||||
if (count == 2) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
String first = "";
|
|
||||||
if (firstValue == null) {
|
|
||||||
throw new NullPointerException(Bytes.toString(r.getRow()) +
|
|
||||||
": first value is null");
|
|
||||||
}
|
|
||||||
first = Bytes.toString(firstValue);
|
|
||||||
|
|
||||||
String second = "";
|
|
||||||
if (secondValue == null) {
|
|
||||||
throw new NullPointerException(Bytes.toString(r.getRow()) +
|
|
||||||
": second value is null");
|
|
||||||
}
|
|
||||||
byte[] secondReversed = new byte[secondValue.length];
|
|
||||||
for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
|
|
||||||
secondReversed[i] = secondValue[j];
|
|
||||||
}
|
|
||||||
second = Bytes.toString(secondReversed);
|
|
||||||
|
|
||||||
if (first.compareTo(second) != 0) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("second key is not the reverse of first. row=" +
|
|
||||||
r.getRow() + ", first value=" + first + ", second value=" +
|
|
||||||
second);
|
|
||||||
}
|
|
||||||
fail();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
scanner.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,34 +58,15 @@ import org.junit.experimental.categories.Category;
|
||||||
* a particular cell, and write it back to the table.
|
* a particular cell, and write it back to the table.
|
||||||
*/
|
*/
|
||||||
@Category(LargeTests.class)
|
@Category(LargeTests.class)
|
||||||
public class TestTableMapReduce {
|
public class TestTableMapReduce extends TestTableMapReduceBase {
|
||||||
private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
|
private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
|
||||||
private static final HBaseTestingUtility UTIL =
|
|
||||||
new HBaseTestingUtility();
|
|
||||||
static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest");
|
|
||||||
static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
|
|
||||||
static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
|
|
||||||
|
|
||||||
@BeforeClass
|
protected Log getLog() { return LOG; }
|
||||||
public static void beforeClass() throws Exception {
|
|
||||||
UTIL.startMiniCluster();
|
|
||||||
HTable table = UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] {INPUT_FAMILY, OUTPUT_FAMILY});
|
|
||||||
UTIL.createMultiRegions(table, INPUT_FAMILY);
|
|
||||||
UTIL.loadTable(table, INPUT_FAMILY);
|
|
||||||
UTIL.startMiniMapReduceCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void afterClass() throws Exception {
|
|
||||||
UTIL.shutdownMiniMapReduceCluster();
|
|
||||||
UTIL.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pass the given key and processed record reduce
|
* Pass the given key and processed record reduce
|
||||||
*/
|
*/
|
||||||
public static class ProcessContentsMapper
|
static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> {
|
||||||
extends TableMapper<ImmutableBytesWritable, Put> {
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pass the key, and reversed value to reduce
|
* Pass the key, and reversed value to reduce
|
||||||
|
@ -119,30 +100,7 @@ public class TestTableMapReduce {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
protected void runTestOnTable(HTable table) throws IOException {
|
||||||
* Test a map/reduce against a multi-region table
|
|
||||||
* @throws IOException
|
|
||||||
* @throws ClassNotFoundException
|
|
||||||
* @throws InterruptedException
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testMultiRegionTable()
|
|
||||||
throws IOException, InterruptedException, ClassNotFoundException {
|
|
||||||
runTestOnTable(new HTable(new Configuration(UTIL.getConfiguration()),
|
|
||||||
MULTI_REGION_TABLE_NAME));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testCombiner()
|
|
||||||
throws IOException, InterruptedException, ClassNotFoundException {
|
|
||||||
Configuration conf = new Configuration(UTIL.getConfiguration());
|
|
||||||
// force use of combiner for testing purposes
|
|
||||||
conf.setInt("min.num.spills.for.combine", 1);
|
|
||||||
runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void runTestOnTable(HTable table)
|
|
||||||
throws IOException, InterruptedException, ClassNotFoundException {
|
|
||||||
Job job = null;
|
Job job = null;
|
||||||
try {
|
try {
|
||||||
LOG.info("Before map/reduce startup");
|
LOG.info("Before map/reduce startup");
|
||||||
|
@ -164,6 +122,10 @@ public class TestTableMapReduce {
|
||||||
|
|
||||||
// verify map-reduce results
|
// verify map-reduce results
|
||||||
verify(Bytes.toString(table.getTableName()));
|
verify(Bytes.toString(table.getTableName()));
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
throw new IOException(e);
|
||||||
} finally {
|
} finally {
|
||||||
table.close();
|
table.close();
|
||||||
if (job != null) {
|
if (job != null) {
|
||||||
|
@ -172,123 +134,4 @@ public class TestTableMapReduce {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verify(String tableName) throws IOException {
|
|
||||||
HTable table = new HTable(new Configuration(UTIL.getConfiguration()), tableName);
|
|
||||||
boolean verified = false;
|
|
||||||
long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
|
|
||||||
int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
|
|
||||||
for (int i = 0; i < numRetries; i++) {
|
|
||||||
try {
|
|
||||||
LOG.info("Verification attempt #" + i);
|
|
||||||
verifyAttempt(table);
|
|
||||||
verified = true;
|
|
||||||
break;
|
|
||||||
} catch (NullPointerException e) {
|
|
||||||
// If here, a cell was empty. Presume its because updates came in
|
|
||||||
// after the scanner had been opened. Wait a while and retry.
|
|
||||||
LOG.debug("Verification attempt failed: " + e.getMessage());
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
Thread.sleep(pause);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
// continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assertTrue(verified);
|
|
||||||
table.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Looks at every value of the mapreduce output and verifies that indeed
|
|
||||||
* the values have been reversed.
|
|
||||||
*
|
|
||||||
* @param table Table to scan.
|
|
||||||
* @throws IOException
|
|
||||||
* @throws NullPointerException if we failed to find a cell value
|
|
||||||
*/
|
|
||||||
private void verifyAttempt(final HTable table) throws IOException, NullPointerException {
|
|
||||||
Scan scan = new Scan();
|
|
||||||
scan.addFamily(INPUT_FAMILY);
|
|
||||||
scan.addFamily(OUTPUT_FAMILY);
|
|
||||||
ResultScanner scanner = table.getScanner(scan);
|
|
||||||
try {
|
|
||||||
Iterator<Result> itr = scanner.iterator();
|
|
||||||
assertTrue(itr.hasNext());
|
|
||||||
while(itr.hasNext()) {
|
|
||||||
Result r = itr.next();
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
if (r.size() > 2 ) {
|
|
||||||
throw new IOException("Too many results, expected 2 got " +
|
|
||||||
r.size());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
byte[] firstValue = null;
|
|
||||||
byte[] secondValue = null;
|
|
||||||
int count = 0;
|
|
||||||
for(Cell kv : r.listCells()) {
|
|
||||||
if (count == 0) {
|
|
||||||
firstValue = CellUtil.cloneValue(kv);
|
|
||||||
}
|
|
||||||
if (count == 1) {
|
|
||||||
secondValue = CellUtil.cloneValue(kv);
|
|
||||||
}
|
|
||||||
count++;
|
|
||||||
if (count == 2) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
String first = "";
|
|
||||||
if (firstValue == null) {
|
|
||||||
throw new NullPointerException(Bytes.toString(r.getRow()) +
|
|
||||||
": first value is null");
|
|
||||||
}
|
|
||||||
first = Bytes.toString(firstValue);
|
|
||||||
|
|
||||||
String second = "";
|
|
||||||
if (secondValue == null) {
|
|
||||||
throw new NullPointerException(Bytes.toString(r.getRow()) +
|
|
||||||
": second value is null");
|
|
||||||
}
|
|
||||||
byte[] secondReversed = new byte[secondValue.length];
|
|
||||||
for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
|
|
||||||
secondReversed[i] = secondValue[j];
|
|
||||||
}
|
|
||||||
second = Bytes.toString(secondReversed);
|
|
||||||
|
|
||||||
if (first.compareTo(second) != 0) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("second key is not the reverse of first. row=" +
|
|
||||||
Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
|
|
||||||
", second value=" + second);
|
|
||||||
}
|
|
||||||
fail();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
scanner.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test that we add tmpjars correctly including the ZK jar.
|
|
||||||
*/
|
|
||||||
public void testAddDependencyJars() throws Exception {
|
|
||||||
Job job = new Job();
|
|
||||||
TableMapReduceUtil.addDependencyJars(job);
|
|
||||||
String tmpjars = job.getConfiguration().get("tmpjars");
|
|
||||||
|
|
||||||
System.err.println("tmpjars: " + tmpjars);
|
|
||||||
assertTrue(tmpjars.contains("zookeeper"));
|
|
||||||
assertFalse(tmpjars.contains("guava"));
|
|
||||||
|
|
||||||
System.err.println("appending guava jar");
|
|
||||||
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
|
|
||||||
com.google.common.base.Function.class);
|
|
||||||
tmpjars = job.getConfiguration().get("tmpjars");
|
|
||||||
assertTrue(tmpjars.contains("guava"));
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,227 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.NavigableMap;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A base class for a test Map/Reduce job over HBase tables. The map/reduce process we're testing
|
||||||
|
* on our tables is simple - take every row in the table, reverse the value of a particular cell,
|
||||||
|
* and write it back to the table. Implements common components between mapred and mapreduce
|
||||||
|
* implementations.
|
||||||
|
*/
|
||||||
|
public abstract class TestTableMapReduceBase {
|
||||||
|
|
||||||
|
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
protected static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest");
|
||||||
|
protected static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
|
||||||
|
protected static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
|
||||||
|
|
||||||
|
protected static final byte[][] columns = new byte[][] {
|
||||||
|
INPUT_FAMILY,
|
||||||
|
OUTPUT_FAMILY
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve my logger instance.
|
||||||
|
*/
|
||||||
|
protected abstract Log getLog();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles API-specifics for setting up and executing the job.
|
||||||
|
*/
|
||||||
|
protected abstract void runTestOnTable(HTable table) throws IOException;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void beforeClass() throws Exception {
|
||||||
|
UTIL.startMiniCluster();
|
||||||
|
HTable table =
|
||||||
|
UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY });
|
||||||
|
UTIL.createMultiRegions(table, INPUT_FAMILY);
|
||||||
|
UTIL.loadTable(table, INPUT_FAMILY);
|
||||||
|
UTIL.startMiniMapReduceCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void afterClass() throws Exception {
|
||||||
|
UTIL.shutdownMiniMapReduceCluster();
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test a map/reduce against a multi-region table
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMultiRegionTable() throws IOException {
|
||||||
|
runTestOnTable(new HTable(UTIL.getConfiguration(), MULTI_REGION_TABLE_NAME));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCombiner() throws IOException {
|
||||||
|
Configuration conf = new Configuration(UTIL.getConfiguration());
|
||||||
|
// force use of combiner for testing purposes
|
||||||
|
conf.setInt("min.num.spills.for.combine", 1);
|
||||||
|
runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implements mapper logic for use across APIs.
|
||||||
|
*/
|
||||||
|
protected static Put map(ImmutableBytesWritable key, Result value) throws IOException {
|
||||||
|
if (value.size() != 1) {
|
||||||
|
throw new IOException("There should only be one input column");
|
||||||
|
}
|
||||||
|
Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
|
||||||
|
cf = value.getMap();
|
||||||
|
if(!cf.containsKey(INPUT_FAMILY)) {
|
||||||
|
throw new IOException("Wrong input columns. Missing: '" +
|
||||||
|
Bytes.toString(INPUT_FAMILY) + "'.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the original value and reverse it
|
||||||
|
|
||||||
|
String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
|
||||||
|
StringBuilder newValue = new StringBuilder(originalValue);
|
||||||
|
newValue.reverse();
|
||||||
|
|
||||||
|
// Now set the value to be collected
|
||||||
|
|
||||||
|
Put outval = new Put(key.get());
|
||||||
|
outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
|
||||||
|
return outval;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void verify(String tableName) throws IOException {
|
||||||
|
HTable table = new HTable(UTIL.getConfiguration(), tableName);
|
||||||
|
boolean verified = false;
|
||||||
|
long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
|
||||||
|
int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
|
||||||
|
for (int i = 0; i < numRetries; i++) {
|
||||||
|
try {
|
||||||
|
getLog().info("Verification attempt #" + i);
|
||||||
|
verifyAttempt(table);
|
||||||
|
verified = true;
|
||||||
|
break;
|
||||||
|
} catch (NullPointerException e) {
|
||||||
|
// If here, a cell was empty. Presume its because updates came in
|
||||||
|
// after the scanner had been opened. Wait a while and retry.
|
||||||
|
getLog().debug("Verification attempt failed: " + e.getMessage());
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(pause);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue(verified);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Looks at every value of the mapreduce output and verifies that indeed
|
||||||
|
* the values have been reversed.
|
||||||
|
* @param table Table to scan.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws NullPointerException if we failed to find a cell value
|
||||||
|
*/
|
||||||
|
private void verifyAttempt(final HTable table) throws IOException, NullPointerException {
|
||||||
|
Scan scan = new Scan();
|
||||||
|
TableInputFormat.addColumns(scan, columns);
|
||||||
|
ResultScanner scanner = table.getScanner(scan);
|
||||||
|
try {
|
||||||
|
Iterator<Result> itr = scanner.iterator();
|
||||||
|
assertTrue(itr.hasNext());
|
||||||
|
while(itr.hasNext()) {
|
||||||
|
Result r = itr.next();
|
||||||
|
if (getLog().isDebugEnabled()) {
|
||||||
|
if (r.size() > 2 ) {
|
||||||
|
throw new IOException("Too many results, expected 2 got " +
|
||||||
|
r.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
byte[] firstValue = null;
|
||||||
|
byte[] secondValue = null;
|
||||||
|
int count = 0;
|
||||||
|
for(Cell kv : r.listCells()) {
|
||||||
|
if (count == 0) {
|
||||||
|
firstValue = CellUtil.cloneValue(kv);
|
||||||
|
}
|
||||||
|
if (count == 1) {
|
||||||
|
secondValue = CellUtil.cloneValue(kv);
|
||||||
|
}
|
||||||
|
count++;
|
||||||
|
if (count == 2) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (firstValue == null) {
|
||||||
|
throw new NullPointerException(Bytes.toString(r.getRow()) +
|
||||||
|
": first value is null");
|
||||||
|
}
|
||||||
|
String first = Bytes.toString(firstValue);
|
||||||
|
|
||||||
|
if (secondValue == null) {
|
||||||
|
throw new NullPointerException(Bytes.toString(r.getRow()) +
|
||||||
|
": second value is null");
|
||||||
|
}
|
||||||
|
byte[] secondReversed = new byte[secondValue.length];
|
||||||
|
for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
|
||||||
|
secondReversed[i] = secondValue[j];
|
||||||
|
}
|
||||||
|
String second = Bytes.toString(secondReversed);
|
||||||
|
|
||||||
|
if (first.compareTo(second) != 0) {
|
||||||
|
if (getLog().isDebugEnabled()) {
|
||||||
|
getLog().debug("second key is not the reverse of first. row=" +
|
||||||
|
Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
|
||||||
|
", second value=" + second);
|
||||||
|
}
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -15,8 +15,11 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.LargeTests;
|
import org.apache.hadoop.hbase.SmallTests;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.io.LongWritable;
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
@ -24,19 +27,15 @@ import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import static org.junit.Assert.*;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test class TableMapReduceUtil
|
* Test different variants of initTableMapperJob method
|
||||||
*/
|
*/
|
||||||
|
@Category(SmallTests.class)
|
||||||
@Category(LargeTests.class)
|
|
||||||
public class TestTableMapReduceUtil {
|
public class TestTableMapReduceUtil {
|
||||||
/**
|
|
||||||
* Test different variants ofinitTableMapperJob method
|
@Test
|
||||||
*/
|
public void testInitTableMapperJob1() throws Exception {
|
||||||
@Test (timeout=600000)
|
|
||||||
public void testInitTableMapperJob() throws Exception {
|
|
||||||
Configuration configuration = new Configuration();
|
Configuration configuration = new Configuration();
|
||||||
Job job = new Job(configuration, "tableName");
|
Job job = new Job(configuration, "tableName");
|
||||||
// test
|
// test
|
||||||
|
@ -48,9 +47,12 @@ public class TestTableMapReduceUtil {
|
||||||
assertEquals(Text.class, job.getOutputValueClass());
|
assertEquals(Text.class, job.getOutputValueClass());
|
||||||
assertNull(job.getCombinerClass());
|
assertNull(job.getCombinerClass());
|
||||||
assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
|
assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
|
||||||
|
}
|
||||||
|
|
||||||
configuration = new Configuration();
|
@Test
|
||||||
job = new Job(configuration, "tableName");
|
public void testInitTableMapperJob2() throws Exception {
|
||||||
|
Configuration configuration = new Configuration();
|
||||||
|
Job job = new Job(configuration, "tableName");
|
||||||
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
|
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
|
||||||
Import.Importer.class, Text.class, Text.class, job, false, HLogInputFormat.class);
|
Import.Importer.class, Text.class, Text.class, job, false, HLogInputFormat.class);
|
||||||
assertEquals(HLogInputFormat.class, job.getInputFormatClass());
|
assertEquals(HLogInputFormat.class, job.getInputFormatClass());
|
||||||
|
@ -59,9 +61,12 @@ public class TestTableMapReduceUtil {
|
||||||
assertEquals(Text.class, job.getOutputValueClass());
|
assertEquals(Text.class, job.getOutputValueClass());
|
||||||
assertNull(job.getCombinerClass());
|
assertNull(job.getCombinerClass());
|
||||||
assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
|
assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
|
||||||
|
}
|
||||||
|
|
||||||
configuration = new Configuration();
|
@Test
|
||||||
job = new Job(configuration, "tableName");
|
public void testInitTableMapperJob3() throws Exception {
|
||||||
|
Configuration configuration = new Configuration();
|
||||||
|
Job job = new Job(configuration, "tableName");
|
||||||
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
|
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
|
||||||
Import.Importer.class, Text.class, Text.class, job);
|
Import.Importer.class, Text.class, Text.class, job);
|
||||||
assertEquals(TableInputFormat.class, job.getInputFormatClass());
|
assertEquals(TableInputFormat.class, job.getInputFormatClass());
|
||||||
|
@ -70,9 +75,12 @@ public class TestTableMapReduceUtil {
|
||||||
assertEquals(Text.class, job.getOutputValueClass());
|
assertEquals(Text.class, job.getOutputValueClass());
|
||||||
assertNull(job.getCombinerClass());
|
assertNull(job.getCombinerClass());
|
||||||
assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
|
assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
|
||||||
|
}
|
||||||
|
|
||||||
configuration = new Configuration();
|
@Test
|
||||||
job = new Job(configuration, "tableName");
|
public void testInitTableMapperJob4() throws Exception {
|
||||||
|
Configuration configuration = new Configuration();
|
||||||
|
Job job = new Job(configuration, "tableName");
|
||||||
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
|
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
|
||||||
Import.Importer.class, Text.class, Text.class, job, false);
|
Import.Importer.class, Text.class, Text.class, job, false);
|
||||||
assertEquals(TableInputFormat.class, job.getInputFormatClass());
|
assertEquals(TableInputFormat.class, job.getInputFormatClass());
|
||||||
|
@ -82,4 +90,4 @@ public class TestTableMapReduceUtil {
|
||||||
assertNull(job.getCombinerClass());
|
assertNull(job.getCombinerClass());
|
||||||
assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
|
assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue