HBASE-3111 TestTableMapReduce broken up on hudson

M src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
  Changed mentions of 'quorum' to 'ensemble'.
M src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
M src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
  Minor formatting.
M src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
  Removed unused imports and minor formatting.
M src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
  Documented what the quorumAddress parameter is for.
  Removed an unnecessary looking HBaseConfiguration.addHbaseResources(conf);
  (and adjusted documentation of job to say no hbase config set by the
  reduce setup method).
M src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
  Made this class implment Configurable.  Moved creation of table from
  RecordWrite constructor based off passed TaskAttemptContext instead
  into the new setConf method.  Added table and conf data members.


git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1022825 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-10-15 05:33:11 +00:00
parent a5a00687ef
commit 954efa6e76
6 changed files with 86 additions and 68 deletions

View File

@ -69,11 +69,10 @@ extends TableReducer<Writable, Writable, Writable> {
* @throws InterruptedException When the job gets interrupted.
*/
@Override
public void reduce(Writable key, Iterable<Writable> values,
Context context) throws IOException, InterruptedException {
public void reduce(Writable key, Iterable<Writable> values, Context context)
throws IOException, InterruptedException {
for(Writable putOrDelete : values) {
context.write(key, putOrDelete);
}
}
}

View File

@ -25,7 +25,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
@ -141,5 +140,4 @@ implements Configurable {
setScan(scan);
}
}

View File

@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
@ -37,7 +36,6 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.StringUtils;
/**
* A base for {@link TableInputFormat}s. Receives a {@link HTable}, an
@ -189,8 +187,6 @@ extends InputFormat<ImmutableBytesWritable, Result> {
return true;
}
/**
* Allows subclasses to get the {@link HTable}.
*/
@ -235,5 +231,4 @@ extends InputFormat<ImmutableBytesWritable, Result> {
protected void setTableRecordReader(TableRecordReader tableRecordReader) {
this.tableRecordReader = tableRecordReader;
}
}

View File

@ -26,17 +26,15 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URL;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scan;
@ -46,7 +44,6 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
/**
* Utility for {@link TableMapper} and {@link TableReducer}
@ -64,13 +61,15 @@ public class TableMapReduceUtil {
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job to adjust.
* @param job The current job to adjust. Make sure the passed job is
* carrying all necessary HBase configuration.
* @throws IOException When setting up the details fails.
*/
public static void initTableMapperJob(String table, Scan scan,
Class<? extends TableMapper> mapper,
Class<? extends WritableComparable> outputKeyClass,
Class<? extends Writable> outputValueClass, Job job) throws IOException {
Class<? extends Writable> outputValueClass, Job job)
throws IOException {
job.setInputFormatClass(TableInputFormat.class);
if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
@ -148,10 +147,18 @@ public class TableMapReduceUtil {
*
* @param table The output table.
* @param reducer The reducer class to use.
* @param job The current job to adjust.
* @param job The current job to adjust. Make sure the passed job is
* carrying all necessary HBase configuration.
* @param partitioner Partitioner to use. Pass <code>null</code> to use
* default partitioner.
* @param quorumAddress Distant cluster to write to
* @param quorumAddress Distant cluster to write to; default is null for
* output to the cluster that is designated in <code>hbase-site.xml</code>.
* Set this String to the zookeeper ensemble of an alternate remote cluster
* when you would have the reduce write a cluster that is other than the
* default; e.g. copying tables between clusters, the source would be
* designated by <code>hbase-site.xml</code> and this param would have the
* ensemble address of the remote cluster. The format to pass is particular.
* Pass <code> &lt;hbase.zookeeper.quorum> ':' &lt;ZOOKEEPER_ZNODE_PARENT></code>.
* @param serverClass redefined hbase.regionserver.class
* @param serverImpl redefined hbase.regionserver.impl
* @throws IOException When determining the region count fails.
@ -165,12 +172,14 @@ public class TableMapReduceUtil {
job.setOutputFormatClass(TableOutputFormat.class);
if (reducer != null) job.setReducerClass(reducer);
conf.set(TableOutputFormat.OUTPUT_TABLE, table);
// If passed a quorum/ensemble address, pass it on to TableOutputFormat.
if (quorumAddress != null) {
if (quorumAddress.split(":").length == 2) {
conf.set(TableOutputFormat.QUORUM_ADDRESS, quorumAddress);
} else {
throw new IOException("Please specify the peer cluster as " +
HConstants.ZOOKEEPER_QUORUM+":"+HConstants.ZOOKEEPER_ZNODE_PARENT);
// Not in expected format.
throw new IOException("Please specify the peer cluster using the format of " +
HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_ZNODE_PARENT);
}
}
if (serverClass != null && serverImpl != null) {

View File

@ -23,7 +23,8 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
@ -34,7 +35,6 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.conf.Configuration;
/**
* Convert Map/Reduce output and write it to an HBase table. The KEY is ignored
@ -43,13 +43,22 @@ import org.apache.hadoop.conf.Configuration;
*
* @param <KEY> The type of the key. Ignored in this class.
*/
public class TableOutputFormat<KEY> extends OutputFormat<KEY, Writable> {
public class TableOutputFormat<KEY> extends OutputFormat<KEY, Writable>
implements Configurable {
private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
/** Job parameter that specifies the output table. */
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
/** Optional job parameter to specify a peer cluster */
/**
* Optional job parameter to specify a peer cluster.
* Used specifying remote cluster when copying between hbase clusters (the
* source is picked up from <code>hbase-site.xml</code>).
* @see {@link TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, Class, String, String, String)}
*/
public static final String QUORUM_ADDRESS = "hbase.mapred.output.quorum";
/** Optional specification of the rs class name of the peer cluster */
public static final String
REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
@ -57,6 +66,11 @@ public class TableOutputFormat<KEY> extends OutputFormat<KEY, Writable> {
public static final String
REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
/** The configuration. */
private Configuration conf = null;
private HTable table;
/**
* Writes the reducer output to an HBase table.
*
@ -120,32 +134,7 @@ public class TableOutputFormat<KEY> extends OutputFormat<KEY, Writable> {
public RecordWriter<KEY, Writable> getRecordWriter(
TaskAttemptContext context)
throws IOException, InterruptedException {
// expecting exactly one path
Configuration conf = new Configuration(context.getConfiguration());
String tableName = conf.get(OUTPUT_TABLE);
String address = conf.get(QUORUM_ADDRESS);
String serverClass = conf.get(REGION_SERVER_CLASS);
String serverImpl = conf.get(REGION_SERVER_IMPL);
HTable table = null;
try {
HBaseConfiguration.addHbaseResources(conf);
if (address != null) {
// Check is done in TMRU
String[] parts = address.split(":");
conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[1]);
}
if (serverClass != null) {
conf.set(HConstants.REGION_SERVER_CLASS, serverClass);
conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
}
table = new HTable(conf, tableName);
} catch(IOException e) {
LOG.error(e);
throw e;
}
table.setAutoFlush(false);
return new TableRecordWriter<KEY>(table);
return new TableRecordWriter<KEY>(this.table);
}
/**
@ -178,4 +167,32 @@ public class TableOutputFormat<KEY> extends OutputFormat<KEY, Writable> {
return new TableOutputCommitter();
}
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
String tableName = conf.get(OUTPUT_TABLE);
String address = conf.get(QUORUM_ADDRESS);
String serverClass = conf.get(REGION_SERVER_CLASS);
String serverImpl = conf.get(REGION_SERVER_IMPL);
try {
if (address != null) {
// Check is done in TMRU
String[] parts = address.split(":");
conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[1]);
}
if (serverClass != null) {
conf.set(HConstants.REGION_SERVER_CLASS, serverClass);
conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
}
this.table = new HTable(conf, tableName);
table.setAutoFlush(false);
LOG.info("Created table instance for " + tableName);
} catch(IOException e) {
LOG.error(e);
}
}
}

View File

@ -61,12 +61,12 @@ public class ZKUtil {
private static final char ZNODE_PATH_SEPARATOR = '/';
/**
* Creates a new connection to ZooKeeper, pulling settings and quorum config
* Creates a new connection to ZooKeeper, pulling settings and ensemble config
* from the specified configuration object using methods from {@link ZKConfig}.
*
* Sets the connection status monitoring watcher to the specified watcher.
*
* @param conf configuration to pull quorum and other settings from
* @param conf configuration to pull ensemble and other settings from
* @param watcher watcher to monitor connection changes
* @return connection to zookeeper
* @throws IOException if unable to connect to zk or config problem
@ -74,26 +74,26 @@ public class ZKUtil {
public static ZooKeeper connect(Configuration conf, Watcher watcher)
throws IOException {
Properties properties = ZKConfig.makeZKProps(conf);
String quorum = ZKConfig.getZKQuorumServersString(properties);
return connect(conf, quorum, watcher);
String ensemble = ZKConfig.getZKQuorumServersString(properties);
return connect(conf, ensemble, watcher);
}
public static ZooKeeper connect(Configuration conf, String quorum,
public static ZooKeeper connect(Configuration conf, String ensemble,
Watcher watcher)
throws IOException {
return connect(conf, quorum, watcher, "");
return connect(conf, ensemble, watcher, "");
}
public static ZooKeeper connect(Configuration conf, String quorum,
public static ZooKeeper connect(Configuration conf, String ensemble,
Watcher watcher, final String descriptor)
throws IOException {
if(quorum == null) {
throw new IOException("Unable to determine ZooKeeper quorum");
if(ensemble == null) {
throw new IOException("Unable to determine ZooKeeper ensemble");
}
int timeout = conf.getInt("zookeeper.session.timeout", 60 * 1000);
LOG.info(descriptor + " opening connection to ZooKeeper with quorum (" +
quorum + ")");
return new ZooKeeper(quorum, timeout, watcher);
LOG.info(descriptor + " opening connection to ZooKeeper with ensemble (" +
ensemble + ")");
return new ZooKeeper(ensemble, timeout, watcher);
}
//
@ -164,9 +164,9 @@ public class ZKUtil {
* @return ensemble key with a name (if any)
*/
public static String getZooKeeperClusterKey(Configuration conf, String name) {
String quorum = conf.get(HConstants.ZOOKEEPER_QUORUM.replaceAll(
String ensemble = conf.get(HConstants.ZOOKEEPER_QUORUM.replaceAll(
"[\\t\\n\\x0B\\f\\r]", ""));
StringBuilder builder = new StringBuilder(quorum);
StringBuilder builder = new StringBuilder(ensemble);
builder.append(":");
builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
if (name != null && !name.isEmpty()) {