HBASE-2136 Forward-port the old mapred package
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@899845 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9b42d8a966
commit
b145b4b83e
|
@ -304,6 +304,7 @@ Release 0.21.0 - Unreleased
|
|||
HBASE-2107 Upgrading Lucene 2.2 to Lucene 3.0.0 (Kay Kay via Stack)
|
||||
HBASE-2111 Move to ivy broke our being able to run in-place; i.e.
|
||||
./bin/start-hbase.sh in a checkout
|
||||
HBASE-2136 Forward-port the old mapred package
|
||||
|
||||
NEW FEATURES
|
||||
HBASE-1961 HBase EC2 scripts
|
||||
|
|
|
@ -0,0 +1,206 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.mapred.FileOutputFormat;
|
||||
import org.apache.hadoop.mapred.JobClient;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
|
||||
/**
|
||||
* Example table column indexing class. Runs a mapreduce job to index
|
||||
* specified table columns.
|
||||
* <ul><li>Each row is modeled as a Lucene document: row key is indexed in
|
||||
* its untokenized form, column name-value pairs are Lucene field name-value
|
||||
* pairs.</li>
|
||||
* <li>A file passed on command line is used to populate an
|
||||
* {@link IndexConfiguration} which is used to set various Lucene parameters,
|
||||
* specify whether to optimize an index and which columns to index and/or
|
||||
* store, in tokenized or untokenized form, etc. For an example, see the
|
||||
* <code>createIndexConfContent</code> method in TestTableIndex
|
||||
* </li>
|
||||
* <li>The number of reduce tasks decides the number of indexes (partitions).
|
||||
* The index(es) is stored in the output path of job configuration.</li>
|
||||
* <li>The index build process is done in the reduce phase. Users can use
|
||||
* the map phase to join rows from different tables or to pre-parse/analyze
|
||||
* column content, etc.</li>
|
||||
* </ul>
|
||||
*/
|
||||
@Deprecated
|
||||
public class BuildTableIndex {
|
||||
private static final String USAGE = "Usage: BuildTableIndex " +
|
||||
"-m <numMapTasks> -r <numReduceTasks>\n -indexConf <iconfFile> " +
|
||||
"-indexDir <indexDir>\n -table <tableName> -columns <columnName1> " +
|
||||
"[<columnName2> ...]";
|
||||
|
||||
private static void printUsage(String message) {
|
||||
System.err.println(message);
|
||||
System.err.println(USAGE);
|
||||
System.exit(-1);
|
||||
}
|
||||
|
||||
/** default constructor */
|
||||
public BuildTableIndex() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param args
|
||||
* @throws IOException
|
||||
*/
|
||||
public void run(String[] args) throws IOException {
|
||||
if (args.length < 6) {
|
||||
printUsage("Too few arguments");
|
||||
}
|
||||
|
||||
int numMapTasks = 1;
|
||||
int numReduceTasks = 1;
|
||||
String iconfFile = null;
|
||||
String indexDir = null;
|
||||
String tableName = null;
|
||||
StringBuffer columnNames = null;
|
||||
|
||||
// parse args
|
||||
for (int i = 0; i < args.length - 1; i++) {
|
||||
if ("-m".equals(args[i])) {
|
||||
numMapTasks = Integer.parseInt(args[++i]);
|
||||
} else if ("-r".equals(args[i])) {
|
||||
numReduceTasks = Integer.parseInt(args[++i]);
|
||||
} else if ("-indexConf".equals(args[i])) {
|
||||
iconfFile = args[++i];
|
||||
} else if ("-indexDir".equals(args[i])) {
|
||||
indexDir = args[++i];
|
||||
} else if ("-table".equals(args[i])) {
|
||||
tableName = args[++i];
|
||||
} else if ("-columns".equals(args[i])) {
|
||||
columnNames = new StringBuffer(args[++i]);
|
||||
while (i + 1 < args.length && !args[i + 1].startsWith("-")) {
|
||||
columnNames.append(" ");
|
||||
columnNames.append(args[++i]);
|
||||
}
|
||||
} else {
|
||||
printUsage("Unsupported option " + args[i]);
|
||||
}
|
||||
}
|
||||
|
||||
if (indexDir == null || tableName == null || columnNames == null) {
|
||||
printUsage("Index directory, table name and at least one column must " +
|
||||
"be specified");
|
||||
}
|
||||
|
||||
Configuration conf = new HBaseConfiguration();
|
||||
if (iconfFile != null) {
|
||||
// set index configuration content from a file
|
||||
String content = readContent(iconfFile);
|
||||
IndexConfiguration iconf = new IndexConfiguration();
|
||||
// purely to validate, exception will be thrown if not valid
|
||||
iconf.addFromXML(content);
|
||||
conf.set("hbase.index.conf", content);
|
||||
}
|
||||
|
||||
if (columnNames != null) {
|
||||
JobConf jobConf = createJob(conf, numMapTasks, numReduceTasks, indexDir,
|
||||
tableName, columnNames.toString());
|
||||
JobClient.runJob(jobConf);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param conf
|
||||
* @param numMapTasks
|
||||
* @param numReduceTasks
|
||||
* @param indexDir
|
||||
* @param tableName
|
||||
* @param columnNames
|
||||
* @return JobConf
|
||||
*/
|
||||
public JobConf createJob(Configuration conf, int numMapTasks,
|
||||
int numReduceTasks, String indexDir, String tableName,
|
||||
String columnNames) {
|
||||
JobConf jobConf = new JobConf(conf, BuildTableIndex.class);
|
||||
jobConf.setJobName("build index for table " + tableName);
|
||||
jobConf.setNumMapTasks(numMapTasks);
|
||||
// number of indexes to partition into
|
||||
jobConf.setNumReduceTasks(numReduceTasks);
|
||||
|
||||
// use identity map (a waste, but just as an example)
|
||||
IdentityTableMap.initJob(tableName, columnNames, IdentityTableMap.class,
|
||||
jobConf);
|
||||
|
||||
// use IndexTableReduce to build a Lucene index
|
||||
jobConf.setReducerClass(IndexTableReduce.class);
|
||||
FileOutputFormat.setOutputPath(jobConf, new Path(indexDir));
|
||||
jobConf.setOutputFormat(IndexOutputFormat.class);
|
||||
|
||||
return jobConf;
|
||||
}
|
||||
|
||||
/*
|
||||
* Read xml file of indexing configurations. The xml format is similar to
|
||||
* hbase-default.xml and hadoop-default.xml. For an example configuration,
|
||||
* see the <code>createIndexConfContent</code> method in TestTableIndex
|
||||
* @param fileName File to read.
|
||||
* @return XML configuration read from file
|
||||
* @throws IOException
|
||||
*/
|
||||
private String readContent(String fileName) throws IOException {
|
||||
File file = new File(fileName);
|
||||
int length = (int) file.length();
|
||||
if (length == 0) {
|
||||
printUsage("Index configuration file " + fileName + " does not exist");
|
||||
}
|
||||
|
||||
int bytesRead = 0;
|
||||
byte[] bytes = new byte[length];
|
||||
FileInputStream fis = new FileInputStream(file);
|
||||
|
||||
try {
|
||||
// read entire file into content
|
||||
while (bytesRead < length) {
|
||||
int read = fis.read(bytes, bytesRead, length - bytesRead);
|
||||
if (read > 0) {
|
||||
bytesRead += read;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
fis.close();
|
||||
}
|
||||
|
||||
return new String(bytes, 0, bytesRead, HConstants.UTF8_ENCODING);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param args
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void main(String[] args) throws IOException {
|
||||
BuildTableIndex build = new BuildTableIndex();
|
||||
build.run(args);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import org.apache.hadoop.util.ProgramDriver;
|
||||
|
||||
/**
|
||||
* Driver for hbase mapreduce jobs. Select which to run by passing
|
||||
* name of job to this main.
|
||||
*/
|
||||
@Deprecated
|
||||
public class Driver {
|
||||
/**
|
||||
* @param args
|
||||
* @throws Throwable
|
||||
*/
|
||||
public static void main(String[] args) throws Throwable {
|
||||
ProgramDriver pgd = new ProgramDriver();
|
||||
pgd.addClass(RowCounter.NAME, RowCounter.class,
|
||||
"Count rows in HBase table");
|
||||
pgd.driver(args);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,162 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.MapReduceBase;
|
||||
import org.apache.hadoop.mapred.OutputCollector;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
|
||||
|
||||
/**
|
||||
* Extract grouping columns from input record
|
||||
*/
|
||||
@Deprecated
|
||||
public class GroupingTableMap
|
||||
extends MapReduceBase
|
||||
implements TableMap<ImmutableBytesWritable,Result> {
|
||||
|
||||
/**
|
||||
* JobConf parameter to specify the columns used to produce the key passed to
|
||||
* collect from the map phase
|
||||
*/
|
||||
public static final String GROUP_COLUMNS =
|
||||
"hbase.mapred.groupingtablemap.columns";
|
||||
|
||||
protected byte [][] columns;
|
||||
|
||||
/**
|
||||
* Use this before submitting a TableMap job. It will appropriately set up the
|
||||
* JobConf.
|
||||
*
|
||||
* @param table table to be processed
|
||||
* @param columns space separated list of columns to fetch
|
||||
* @param groupColumns space separated list of columns used to form the key
|
||||
* used in collect
|
||||
* @param mapper map class
|
||||
* @param job job configuration object
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static void initJob(String table, String columns, String groupColumns,
|
||||
Class<? extends TableMap> mapper, JobConf job) {
|
||||
|
||||
TableMapReduceUtil.initTableMapJob(table, columns, mapper,
|
||||
ImmutableBytesWritable.class, Result.class, job);
|
||||
job.set(GROUP_COLUMNS, groupColumns);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(JobConf job) {
|
||||
super.configure(job);
|
||||
String[] cols = job.get(GROUP_COLUMNS, "").split(" ");
|
||||
columns = new byte[cols.length][];
|
||||
for(int i = 0; i < cols.length; i++) {
|
||||
columns[i] = Bytes.toBytes(cols[i]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the grouping columns from value to construct a new key.
|
||||
*
|
||||
* Pass the new key and value to reduce.
|
||||
* If any of the grouping columns are not found in the value, the record is skipped.
|
||||
* @param key
|
||||
* @param value
|
||||
* @param output
|
||||
* @param reporter
|
||||
* @throws IOException
|
||||
*/
|
||||
public void map(ImmutableBytesWritable key, Result value,
|
||||
OutputCollector<ImmutableBytesWritable,Result> output,
|
||||
Reporter reporter) throws IOException {
|
||||
|
||||
byte[][] keyVals = extractKeyValues(value);
|
||||
if(keyVals != null) {
|
||||
ImmutableBytesWritable tKey = createGroupKey(keyVals);
|
||||
output.collect(tKey, value);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract columns values from the current record. This method returns
|
||||
* null if any of the columns are not found.
|
||||
*
|
||||
* Override this method if you want to deal with nulls differently.
|
||||
*
|
||||
* @param r
|
||||
* @return array of byte values
|
||||
*/
|
||||
protected byte[][] extractKeyValues(Result r) {
|
||||
byte[][] keyVals = null;
|
||||
ArrayList<byte[]> foundList = new ArrayList<byte[]>();
|
||||
int numCols = columns.length;
|
||||
if (numCols > 0) {
|
||||
for (KeyValue value: r.list()) {
|
||||
byte [] column = KeyValue.makeColumn(value.getFamily(),
|
||||
value.getQualifier());
|
||||
for (int i = 0; i < numCols; i++) {
|
||||
if (Bytes.equals(column, columns[i])) {
|
||||
foundList.add(value.getValue());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if(foundList.size() == numCols) {
|
||||
keyVals = foundList.toArray(new byte[numCols][]);
|
||||
}
|
||||
}
|
||||
return keyVals;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a key by concatenating multiple column values.
|
||||
* Override this function in order to produce different types of keys.
|
||||
*
|
||||
* @param vals
|
||||
* @return key generated by concatenating multiple column values
|
||||
*/
|
||||
protected ImmutableBytesWritable createGroupKey(byte[][] vals) {
|
||||
if(vals == null) {
|
||||
return null;
|
||||
}
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for(int i = 0; i < vals.length; i++) {
|
||||
if(i > 0) {
|
||||
sb.append(" ");
|
||||
}
|
||||
try {
|
||||
sb.append(new String(vals[i], HConstants.UTF8_ENCODING));
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
return new ImmutableBytesWritable(Bytes.toBytes(sb.toString()));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,91 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.Partitioner;
|
||||
|
||||
|
||||
/**
|
||||
* This is used to partition the output keys into groups of keys.
|
||||
* Keys are grouped according to the regions that currently exist
|
||||
* so that each reducer fills a single region so load is distributed.
|
||||
*
|
||||
* @param <K2>
|
||||
* @param <V2>
|
||||
*/
|
||||
@Deprecated
|
||||
public class HRegionPartitioner<K2,V2>
|
||||
implements Partitioner<ImmutableBytesWritable, V2> {
|
||||
private final Log LOG = LogFactory.getLog(TableInputFormat.class);
|
||||
private HTable table;
|
||||
private byte[][] startKeys;
|
||||
|
||||
public void configure(JobConf job) {
|
||||
try {
|
||||
this.table = new HTable(new HBaseConfiguration(job),
|
||||
job.get(TableOutputFormat.OUTPUT_TABLE));
|
||||
} catch (IOException e) {
|
||||
LOG.error(e);
|
||||
}
|
||||
|
||||
try {
|
||||
this.startKeys = this.table.getStartKeys();
|
||||
} catch (IOException e) {
|
||||
LOG.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
public int getPartition(ImmutableBytesWritable key,
|
||||
V2 value, int numPartitions) {
|
||||
byte[] region = null;
|
||||
// Only one region return 0
|
||||
if (this.startKeys.length == 1){
|
||||
return 0;
|
||||
}
|
||||
try {
|
||||
// Not sure if this is cached after a split so we could have problems
|
||||
// here if a region splits while mapping
|
||||
region = table.getRegionLocation(key.get()).getRegionInfo().getStartKey();
|
||||
} catch (IOException e) {
|
||||
LOG.error(e);
|
||||
}
|
||||
for (int i = 0; i < this.startKeys.length; i++){
|
||||
if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){
|
||||
if (i >= numPartitions-1){
|
||||
// cover if we have less reduces then regions.
|
||||
return (Integer.toString(i).hashCode()
|
||||
& Integer.MAX_VALUE) % numPartitions;
|
||||
}
|
||||
return i;
|
||||
}
|
||||
}
|
||||
// if above fails to find start key that match we need to return something
|
||||
return 0;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.MapReduceBase;
|
||||
import org.apache.hadoop.mapred.OutputCollector;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
|
||||
/**
|
||||
* Pass the given key and record as-is to reduce
|
||||
*/
|
||||
@Deprecated
|
||||
public class IdentityTableMap
|
||||
extends MapReduceBase
|
||||
implements TableMap<ImmutableBytesWritable, Result> {
|
||||
|
||||
/** constructor */
|
||||
public IdentityTableMap() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this before submitting a TableMap job. It will
|
||||
* appropriately set up the JobConf.
|
||||
*
|
||||
* @param table table name
|
||||
* @param columns columns to scan
|
||||
* @param mapper mapper class
|
||||
* @param job job configuration
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static void initJob(String table, String columns,
|
||||
Class<? extends TableMap> mapper, JobConf job) {
|
||||
TableMapReduceUtil.initTableMapJob(table, columns, mapper,
|
||||
ImmutableBytesWritable.class,
|
||||
Result.class, job);
|
||||
}
|
||||
|
||||
/**
|
||||
* Pass the key, value to reduce
|
||||
* @param key
|
||||
* @param value
|
||||
* @param output
|
||||
* @param reporter
|
||||
* @throws IOException
|
||||
*/
|
||||
public void map(ImmutableBytesWritable key, Result value,
|
||||
OutputCollector<ImmutableBytesWritable,Result> output,
|
||||
Reporter reporter) throws IOException {
|
||||
|
||||
// convert
|
||||
output.collect(key, value);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,61 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.mapred.MapReduceBase;
|
||||
import org.apache.hadoop.mapred.OutputCollector;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
|
||||
/**
|
||||
* Write to table each key, record pair
|
||||
*/
|
||||
@Deprecated
|
||||
public class IdentityTableReduce
|
||||
extends MapReduceBase
|
||||
implements TableReduce<ImmutableBytesWritable, Put> {
|
||||
@SuppressWarnings("unused")
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(IdentityTableReduce.class.getName());
|
||||
|
||||
/**
|
||||
* No aggregation, output pairs of (key, record)
|
||||
* @param key
|
||||
* @param values
|
||||
* @param output
|
||||
* @param reporter
|
||||
* @throws IOException
|
||||
*/
|
||||
public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
|
||||
OutputCollector<ImmutableBytesWritable, Put> output,
|
||||
Reporter reporter)
|
||||
throws IOException {
|
||||
|
||||
while(values.hasNext()) {
|
||||
output.collect(key, values.next());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,423 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.StringWriter;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
import javax.xml.transform.Transformer;
|
||||
import javax.xml.transform.TransformerFactory;
|
||||
import javax.xml.transform.dom.DOMSource;
|
||||
import javax.xml.transform.stream.StreamResult;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.w3c.dom.Document;
|
||||
import org.w3c.dom.Element;
|
||||
import org.w3c.dom.Node;
|
||||
import org.w3c.dom.NodeList;
|
||||
import org.w3c.dom.Text;
|
||||
|
||||
/**
|
||||
* Configuration parameters for building a Lucene index
|
||||
*/
|
||||
@Deprecated
|
||||
public class IndexConfiguration extends Configuration {
|
||||
private static final Log LOG = LogFactory.getLog(IndexConfiguration.class);
|
||||
|
||||
static final String HBASE_COLUMN_NAME = "hbase.column.name";
|
||||
static final String HBASE_COLUMN_STORE = "hbase.column.store";
|
||||
static final String HBASE_COLUMN_INDEX = "hbase.column.index";
|
||||
static final String HBASE_COLUMN_TOKENIZE = "hbase.column.tokenize";
|
||||
static final String HBASE_COLUMN_BOOST = "hbase.column.boost";
|
||||
static final String HBASE_COLUMN_OMIT_NORMS = "hbase.column.omit.norms";
|
||||
static final String HBASE_INDEX_ROWKEY_NAME = "hbase.index.rowkey.name";
|
||||
static final String HBASE_INDEX_ANALYZER_NAME = "hbase.index.analyzer.name";
|
||||
static final String HBASE_INDEX_MAX_BUFFERED_DOCS =
|
||||
"hbase.index.max.buffered.docs";
|
||||
static final String HBASE_INDEX_MAX_BUFFERED_DELS =
|
||||
"hbase.index.max.buffered.dels";
|
||||
static final String HBASE_INDEX_MAX_FIELD_LENGTH =
|
||||
"hbase.index.max.field.length";
|
||||
static final String HBASE_INDEX_MAX_MERGE_DOCS =
|
||||
"hbase.index.max.merge.docs";
|
||||
static final String HBASE_INDEX_MERGE_FACTOR = "hbase.index.merge.factor";
|
||||
// double ramBufferSizeMB;
|
||||
static final String HBASE_INDEX_SIMILARITY_NAME =
|
||||
"hbase.index.similarity.name";
|
||||
static final String HBASE_INDEX_USE_COMPOUND_FILE =
|
||||
"hbase.index.use.compound.file";
|
||||
static final String HBASE_INDEX_OPTIMIZE = "hbase.index.optimize";
|
||||
|
||||
public static class ColumnConf extends Properties {
|
||||
|
||||
private static final long serialVersionUID = 7419012290580607821L;
|
||||
|
||||
boolean getBoolean(String name, boolean defaultValue) {
|
||||
String valueString = getProperty(name);
|
||||
if ("true".equals(valueString))
|
||||
return true;
|
||||
else if ("false".equals(valueString))
|
||||
return false;
|
||||
else
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
void setBoolean(String name, boolean value) {
|
||||
setProperty(name, Boolean.toString(value));
|
||||
}
|
||||
|
||||
float getFloat(String name, float defaultValue) {
|
||||
String valueString = getProperty(name);
|
||||
if (valueString == null)
|
||||
return defaultValue;
|
||||
try {
|
||||
return Float.parseFloat(valueString);
|
||||
} catch (NumberFormatException e) {
|
||||
return defaultValue;
|
||||
}
|
||||
}
|
||||
|
||||
void setFloat(String name, float value) {
|
||||
setProperty(name, Float.toString(value));
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, ColumnConf> columnMap =
|
||||
new ConcurrentHashMap<String, ColumnConf>();
|
||||
|
||||
public Iterator<String> columnNameIterator() {
|
||||
return columnMap.keySet().iterator();
|
||||
}
|
||||
|
||||
public boolean isIndex(String columnName) {
|
||||
return getColumn(columnName).getBoolean(HBASE_COLUMN_INDEX, true);
|
||||
}
|
||||
|
||||
public void setIndex(String columnName, boolean index) {
|
||||
getColumn(columnName).setBoolean(HBASE_COLUMN_INDEX, index);
|
||||
}
|
||||
|
||||
public boolean isStore(String columnName) {
|
||||
return getColumn(columnName).getBoolean(HBASE_COLUMN_STORE, false);
|
||||
}
|
||||
|
||||
public void setStore(String columnName, boolean store) {
|
||||
getColumn(columnName).setBoolean(HBASE_COLUMN_STORE, store);
|
||||
}
|
||||
|
||||
public boolean isTokenize(String columnName) {
|
||||
return getColumn(columnName).getBoolean(HBASE_COLUMN_TOKENIZE, true);
|
||||
}
|
||||
|
||||
public void setTokenize(String columnName, boolean tokenize) {
|
||||
getColumn(columnName).setBoolean(HBASE_COLUMN_TOKENIZE, tokenize);
|
||||
}
|
||||
|
||||
public float getBoost(String columnName) {
|
||||
return getColumn(columnName).getFloat(HBASE_COLUMN_BOOST, 1.0f);
|
||||
}
|
||||
|
||||
public void setBoost(String columnName, float boost) {
|
||||
getColumn(columnName).setFloat(HBASE_COLUMN_BOOST, boost);
|
||||
}
|
||||
|
||||
public boolean isOmitNorms(String columnName) {
|
||||
return getColumn(columnName).getBoolean(HBASE_COLUMN_OMIT_NORMS, true);
|
||||
}
|
||||
|
||||
public void setOmitNorms(String columnName, boolean omitNorms) {
|
||||
getColumn(columnName).setBoolean(HBASE_COLUMN_OMIT_NORMS, omitNorms);
|
||||
}
|
||||
|
||||
private ColumnConf getColumn(String columnName) {
|
||||
ColumnConf column = columnMap.get(columnName);
|
||||
if (column == null) {
|
||||
column = new ColumnConf();
|
||||
columnMap.put(columnName, column);
|
||||
}
|
||||
return column;
|
||||
}
|
||||
|
||||
public String getAnalyzerName() {
|
||||
return get(HBASE_INDEX_ANALYZER_NAME,
|
||||
"org.apache.lucene.analysis.standard.StandardAnalyzer");
|
||||
}
|
||||
|
||||
public void setAnalyzerName(String analyzerName) {
|
||||
set(HBASE_INDEX_ANALYZER_NAME, analyzerName);
|
||||
}
|
||||
|
||||
public int getMaxBufferedDeleteTerms() {
|
||||
return getInt(HBASE_INDEX_MAX_BUFFERED_DELS, 1000);
|
||||
}
|
||||
|
||||
public void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
|
||||
setInt(HBASE_INDEX_MAX_BUFFERED_DELS, maxBufferedDeleteTerms);
|
||||
}
|
||||
|
||||
public int getMaxBufferedDocs() {
|
||||
return getInt(HBASE_INDEX_MAX_BUFFERED_DOCS, 10);
|
||||
}
|
||||
|
||||
public void setMaxBufferedDocs(int maxBufferedDocs) {
|
||||
setInt(HBASE_INDEX_MAX_BUFFERED_DOCS, maxBufferedDocs);
|
||||
}
|
||||
|
||||
public int getMaxFieldLength() {
|
||||
return getInt(HBASE_INDEX_MAX_FIELD_LENGTH, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
public void setMaxFieldLength(int maxFieldLength) {
|
||||
setInt(HBASE_INDEX_MAX_FIELD_LENGTH, maxFieldLength);
|
||||
}
|
||||
|
||||
public int getMaxMergeDocs() {
|
||||
return getInt(HBASE_INDEX_MAX_MERGE_DOCS, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
public void setMaxMergeDocs(int maxMergeDocs) {
|
||||
setInt(HBASE_INDEX_MAX_MERGE_DOCS, maxMergeDocs);
|
||||
}
|
||||
|
||||
public int getMergeFactor() {
|
||||
return getInt(HBASE_INDEX_MERGE_FACTOR, 10);
|
||||
}
|
||||
|
||||
public void setMergeFactor(int mergeFactor) {
|
||||
setInt(HBASE_INDEX_MERGE_FACTOR, mergeFactor);
|
||||
}
|
||||
|
||||
public String getRowkeyName() {
|
||||
return get(HBASE_INDEX_ROWKEY_NAME, "ROWKEY");
|
||||
}
|
||||
|
||||
public void setRowkeyName(String rowkeyName) {
|
||||
set(HBASE_INDEX_ROWKEY_NAME, rowkeyName);
|
||||
}
|
||||
|
||||
public String getSimilarityName() {
|
||||
return get(HBASE_INDEX_SIMILARITY_NAME, null);
|
||||
}
|
||||
|
||||
public void setSimilarityName(String similarityName) {
|
||||
set(HBASE_INDEX_SIMILARITY_NAME, similarityName);
|
||||
}
|
||||
|
||||
public boolean isUseCompoundFile() {
|
||||
return getBoolean(HBASE_INDEX_USE_COMPOUND_FILE, false);
|
||||
}
|
||||
|
||||
public void setUseCompoundFile(boolean useCompoundFile) {
|
||||
setBoolean(HBASE_INDEX_USE_COMPOUND_FILE, useCompoundFile);
|
||||
}
|
||||
|
||||
public boolean doOptimize() {
|
||||
return getBoolean(HBASE_INDEX_OPTIMIZE, true);
|
||||
}
|
||||
|
||||
public void setDoOptimize(boolean doOptimize) {
|
||||
setBoolean(HBASE_INDEX_OPTIMIZE, doOptimize);
|
||||
}
|
||||
|
||||
public void addFromXML(String content) {
|
||||
try {
|
||||
DocumentBuilder builder = DocumentBuilderFactory.newInstance()
|
||||
.newDocumentBuilder();
|
||||
|
||||
Document doc = builder
|
||||
.parse(new ByteArrayInputStream(content.getBytes()));
|
||||
|
||||
Element root = doc.getDocumentElement();
|
||||
if (!"configuration".equals(root.getTagName())) {
|
||||
LOG.fatal("bad conf file: top-level element not <configuration>");
|
||||
}
|
||||
|
||||
NodeList props = root.getChildNodes();
|
||||
for (int i = 0; i < props.getLength(); i++) {
|
||||
Node propNode = props.item(i);
|
||||
if (!(propNode instanceof Element)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Element prop = (Element) propNode;
|
||||
if ("property".equals(prop.getTagName())) {
|
||||
propertyFromXML(prop, null);
|
||||
} else if ("column".equals(prop.getTagName())) {
|
||||
columnConfFromXML(prop);
|
||||
} else {
|
||||
LOG.warn("bad conf content: element neither <property> nor <column>");
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.fatal("error parsing conf content: " + e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void propertyFromXML(Element prop, Properties properties) {
|
||||
NodeList fields = prop.getChildNodes();
|
||||
String attr = null;
|
||||
String value = null;
|
||||
|
||||
for (int j = 0; j < fields.getLength(); j++) {
|
||||
Node fieldNode = fields.item(j);
|
||||
if (!(fieldNode instanceof Element)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Element field = (Element) fieldNode;
|
||||
if ("name".equals(field.getTagName())) {
|
||||
attr = ((Text) field.getFirstChild()).getData();
|
||||
}
|
||||
if ("value".equals(field.getTagName()) && field.hasChildNodes()) {
|
||||
value = ((Text) field.getFirstChild()).getData();
|
||||
}
|
||||
}
|
||||
|
||||
if (attr != null && value != null) {
|
||||
if (properties == null) {
|
||||
set(attr, value);
|
||||
} else {
|
||||
properties.setProperty(attr, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void columnConfFromXML(Element column) {
|
||||
ColumnConf columnConf = new ColumnConf();
|
||||
NodeList props = column.getChildNodes();
|
||||
for (int i = 0; i < props.getLength(); i++) {
|
||||
Node propNode = props.item(i);
|
||||
if (!(propNode instanceof Element)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Element prop = (Element) propNode;
|
||||
if ("property".equals(prop.getTagName())) {
|
||||
propertyFromXML(prop, columnConf);
|
||||
} else {
|
||||
LOG.warn("bad conf content: element not <property>");
|
||||
}
|
||||
}
|
||||
|
||||
if (columnConf.getProperty(HBASE_COLUMN_NAME) != null) {
|
||||
columnMap.put(columnConf.getProperty(HBASE_COLUMN_NAME), columnConf);
|
||||
} else {
|
||||
LOG.warn("bad column conf: name not specified");
|
||||
}
|
||||
}
|
||||
|
||||
public void write(OutputStream out) {
|
||||
try {
|
||||
Document doc = writeDocument();
|
||||
DOMSource source = new DOMSource(doc);
|
||||
StreamResult result = new StreamResult(out);
|
||||
TransformerFactory transFactory = TransformerFactory.newInstance();
|
||||
Transformer transformer = transFactory.newTransformer();
|
||||
transformer.transform(source, result);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private Document writeDocument() {
|
||||
Iterator<Map.Entry<String, String>> iter = iterator();
|
||||
try {
|
||||
Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder()
|
||||
.newDocument();
|
||||
Element conf = doc.createElement("configuration");
|
||||
doc.appendChild(conf);
|
||||
conf.appendChild(doc.createTextNode("\n"));
|
||||
|
||||
Map.Entry<String, String> entry;
|
||||
while (iter.hasNext()) {
|
||||
entry = iter.next();
|
||||
String name = entry.getKey();
|
||||
String value = entry.getValue();
|
||||
writeProperty(doc, conf, name, value);
|
||||
}
|
||||
|
||||
Iterator<String> columnIter = columnNameIterator();
|
||||
while (columnIter.hasNext()) {
|
||||
writeColumn(doc, conf, columnIter.next());
|
||||
}
|
||||
|
||||
return doc;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void writeProperty(Document doc, Element parent, String name,
|
||||
String value) {
|
||||
Element propNode = doc.createElement("property");
|
||||
parent.appendChild(propNode);
|
||||
|
||||
Element nameNode = doc.createElement("name");
|
||||
nameNode.appendChild(doc.createTextNode(name));
|
||||
propNode.appendChild(nameNode);
|
||||
|
||||
Element valueNode = doc.createElement("value");
|
||||
valueNode.appendChild(doc.createTextNode(value));
|
||||
propNode.appendChild(valueNode);
|
||||
|
||||
parent.appendChild(doc.createTextNode("\n"));
|
||||
}
|
||||
|
||||
private void writeColumn(Document doc, Element parent, String columnName) {
|
||||
Element column = doc.createElement("column");
|
||||
parent.appendChild(column);
|
||||
column.appendChild(doc.createTextNode("\n"));
|
||||
|
||||
ColumnConf columnConf = getColumn(columnName);
|
||||
for (Map.Entry<Object, Object> entry : columnConf.entrySet()) {
|
||||
if (entry.getKey() instanceof String
|
||||
&& entry.getValue() instanceof String) {
|
||||
writeProperty(doc, column, (String) entry.getKey(), (String) entry
|
||||
.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringWriter writer = new StringWriter();
|
||||
try {
|
||||
Document doc = writeDocument();
|
||||
DOMSource source = new DOMSource(doc);
|
||||
StreamResult result = new StreamResult(writer);
|
||||
TransformerFactory transFactory = TransformerFactory.newInstance();
|
||||
Transformer transformer = transFactory.newTransformer();
|
||||
transformer.transform(source, result);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return writer.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,166 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.File;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.FileOutputFormat;
|
||||
import org.apache.hadoop.mapred.RecordWriter;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.search.Similarity;
|
||||
import org.apache.lucene.store.FSDirectory;
|
||||
|
||||
/**
|
||||
* Create a local index, unwrap Lucene documents created by reduce, add them to
|
||||
* the index, and copy the index to the destination.
|
||||
*/
|
||||
@Deprecated
|
||||
public class IndexOutputFormat extends
|
||||
FileOutputFormat<ImmutableBytesWritable, LuceneDocumentWrapper> {
|
||||
static final Log LOG = LogFactory.getLog(IndexOutputFormat.class);
|
||||
|
||||
private Random random = new Random();
|
||||
|
||||
@Override
|
||||
public RecordWriter<ImmutableBytesWritable, LuceneDocumentWrapper>
|
||||
getRecordWriter(final FileSystem fs, JobConf job, String name,
|
||||
final Progressable progress)
|
||||
throws IOException {
|
||||
|
||||
final Path perm = new Path(FileOutputFormat.getOutputPath(job), name);
|
||||
final Path temp = job.getLocalPath("index/_"
|
||||
+ Integer.toString(random.nextInt()));
|
||||
|
||||
LOG.info("To index into " + perm);
|
||||
|
||||
// delete old, if any
|
||||
fs.delete(perm, true);
|
||||
|
||||
final IndexConfiguration indexConf = new IndexConfiguration();
|
||||
String content = job.get("hbase.index.conf");
|
||||
if (content != null) {
|
||||
indexConf.addFromXML(content);
|
||||
}
|
||||
|
||||
String analyzerName = indexConf.getAnalyzerName();
|
||||
Analyzer analyzer;
|
||||
try {
|
||||
Class<?> analyzerClass = Class.forName(analyzerName);
|
||||
analyzer = (Analyzer) analyzerClass.newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Error in creating an analyzer object "
|
||||
+ analyzerName);
|
||||
}
|
||||
|
||||
// build locally first
|
||||
final IndexWriter writer = new IndexWriter(FSDirectory.open(new File(fs.startLocalOutput(perm, temp)
|
||||
.toString())), analyzer, true, IndexWriter.MaxFieldLength.LIMITED);
|
||||
|
||||
// no delete, so no need for maxBufferedDeleteTerms
|
||||
writer.setMaxBufferedDocs(indexConf.getMaxBufferedDocs());
|
||||
writer.setMaxFieldLength(indexConf.getMaxFieldLength());
|
||||
writer.setMaxMergeDocs(indexConf.getMaxMergeDocs());
|
||||
writer.setMergeFactor(indexConf.getMergeFactor());
|
||||
String similarityName = indexConf.getSimilarityName();
|
||||
if (similarityName != null) {
|
||||
try {
|
||||
Class<?> similarityClass = Class.forName(similarityName);
|
||||
Similarity similarity = (Similarity) similarityClass.newInstance();
|
||||
writer.setSimilarity(similarity);
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Error in creating a similarty object "
|
||||
+ similarityName);
|
||||
}
|
||||
}
|
||||
writer.setUseCompoundFile(indexConf.isUseCompoundFile());
|
||||
|
||||
return new RecordWriter<ImmutableBytesWritable, LuceneDocumentWrapper>() {
|
||||
boolean closed;
|
||||
private long docCount = 0;
|
||||
|
||||
public void write(ImmutableBytesWritable key,
|
||||
LuceneDocumentWrapper value)
|
||||
throws IOException {
|
||||
// unwrap and index doc
|
||||
Document doc = value.get();
|
||||
writer.addDocument(doc);
|
||||
docCount++;
|
||||
progress.progress();
|
||||
}
|
||||
|
||||
public void close(final Reporter reporter) throws IOException {
|
||||
// spawn a thread to give progress heartbeats
|
||||
Thread prog = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
while (!closed) {
|
||||
try {
|
||||
reporter.setStatus("closing");
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
continue;
|
||||
} catch (Throwable e) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
prog.start();
|
||||
|
||||
// optimize index
|
||||
if (indexConf.doOptimize()) {
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Optimizing index.");
|
||||
}
|
||||
writer.optimize();
|
||||
}
|
||||
|
||||
// close index
|
||||
writer.close();
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Done indexing " + docCount + " docs.");
|
||||
}
|
||||
|
||||
// copy to perm destination in dfs
|
||||
fs.completeLocalOutput(perm, temp);
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Copy done.");
|
||||
}
|
||||
} finally {
|
||||
closed = true;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,108 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.MapReduceBase;
|
||||
import org.apache.hadoop.mapred.OutputCollector;
|
||||
import org.apache.hadoop.mapred.Reducer;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* Construct a Lucene document per row, which is consumed by IndexOutputFormat
|
||||
* to build a Lucene index
|
||||
*/
|
||||
@Deprecated
|
||||
public class IndexTableReduce extends MapReduceBase implements
|
||||
Reducer<ImmutableBytesWritable, Result, ImmutableBytesWritable, LuceneDocumentWrapper> {
|
||||
private static final Log LOG = LogFactory.getLog(IndexTableReduce.class);
|
||||
private IndexConfiguration indexConf;
|
||||
|
||||
@Override
|
||||
public void configure(JobConf job) {
|
||||
super.configure(job);
|
||||
indexConf = new IndexConfiguration();
|
||||
String content = job.get("hbase.index.conf");
|
||||
if (content != null) {
|
||||
indexConf.addFromXML(content);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Index conf: " + indexConf);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
super.close();
|
||||
}
|
||||
|
||||
public void reduce(ImmutableBytesWritable key, Iterator<Result> values,
|
||||
OutputCollector<ImmutableBytesWritable, LuceneDocumentWrapper> output,
|
||||
Reporter reporter)
|
||||
throws IOException {
|
||||
Document doc = null;
|
||||
while(values.hasNext()) {
|
||||
Result r = values.next();
|
||||
if (doc == null) {
|
||||
doc = new Document();
|
||||
// index and store row key, row key already UTF-8 encoded
|
||||
Field keyField = new Field(indexConf.getRowkeyName(),
|
||||
Bytes.toString(key.get(), key.getOffset(), key.getLength()),
|
||||
Field.Store.YES, Field.Index.NOT_ANALYZED);
|
||||
keyField.setOmitNorms(true);
|
||||
doc.add(keyField);
|
||||
}
|
||||
// each column (name-value pair) is a field (name-value pair)
|
||||
for (KeyValue kv: r.list()) {
|
||||
// name is already UTF-8 encoded
|
||||
String column = Bytes.toString(KeyValue.makeColumn(kv.getFamily(),
|
||||
kv.getQualifier()));
|
||||
byte[] columnValue = kv.getValue();
|
||||
Field.Store store = indexConf.isStore(column)?
|
||||
Field.Store.YES: Field.Store.NO;
|
||||
Field.Index index = indexConf.isIndex(column)?
|
||||
(indexConf.isTokenize(column)?
|
||||
Field.Index.ANALYZED: Field.Index.NOT_ANALYZED):
|
||||
Field.Index.NO;
|
||||
|
||||
// UTF-8 encode value
|
||||
Field field = new Field(column, Bytes.toString(columnValue),
|
||||
store, index);
|
||||
field.setBoost(indexConf.getBoost(column));
|
||||
field.setOmitNorms(indexConf.isOmitNorms(column));
|
||||
|
||||
doc.add(field);
|
||||
}
|
||||
}
|
||||
output.collect(key, new LuceneDocumentWrapper(doc));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,56 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.lucene.document.Document;
|
||||
|
||||
/**
|
||||
* A utility class used to pass a lucene document from reduce to OutputFormat.
|
||||
* It doesn't really serialize/deserialize a lucene document.
|
||||
*/
|
||||
@Deprecated
|
||||
public class LuceneDocumentWrapper implements Writable {
|
||||
protected Document doc;
|
||||
|
||||
/**
|
||||
* @param doc
|
||||
*/
|
||||
public LuceneDocumentWrapper(Document doc) {
|
||||
this.doc = doc;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the document
|
||||
*/
|
||||
public Document get() {
|
||||
return doc;
|
||||
}
|
||||
|
||||
public void readFields(DataInput in) {
|
||||
// intentionally left blank
|
||||
}
|
||||
|
||||
public void write(DataOutput out) {
|
||||
// intentionally left blank
|
||||
}
|
||||
}
|
|
@ -0,0 +1,137 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.mapred.FileOutputFormat;
|
||||
import org.apache.hadoop.mapred.JobClient;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.OutputCollector;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.apache.hadoop.mapred.lib.IdentityReducer;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
/**
|
||||
* A job with a map to count rows.
|
||||
* Map outputs table rows IF the input row has columns that have content.
|
||||
* Uses an {@link IdentityReducer}
|
||||
*/
|
||||
@Deprecated
|
||||
public class RowCounter extends Configured implements Tool {
|
||||
// Name of this 'program'
|
||||
static final String NAME = "rowcounter";
|
||||
|
||||
/**
|
||||
* Mapper that runs the count.
|
||||
*/
|
||||
static class RowCounterMapper
|
||||
implements TableMap<ImmutableBytesWritable, Result> {
|
||||
private static enum Counters {ROWS}
|
||||
|
||||
public void map(ImmutableBytesWritable row, Result values,
|
||||
OutputCollector<ImmutableBytesWritable, Result> output,
|
||||
Reporter reporter)
|
||||
throws IOException {
|
||||
boolean content = false;
|
||||
|
||||
for (KeyValue value: values.list()) {
|
||||
if (value.getValue().length > 0) {
|
||||
content = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!content) {
|
||||
// Don't count rows that are all empty values.
|
||||
return;
|
||||
}
|
||||
// Give out same value every time. We're only interested in the row/key
|
||||
reporter.incrCounter(Counters.ROWS, 1);
|
||||
}
|
||||
|
||||
public void configure(JobConf jc) {
|
||||
// Nothing to do.
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
// Nothing to do.
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param args
|
||||
* @return the JobConf
|
||||
* @throws IOException
|
||||
*/
|
||||
public JobConf createSubmittableJob(String[] args) throws IOException {
|
||||
JobConf c = new JobConf(getConf(), getClass());
|
||||
c.setJobName(NAME);
|
||||
// Columns are space delimited
|
||||
StringBuilder sb = new StringBuilder();
|
||||
final int columnoffset = 2;
|
||||
for (int i = columnoffset; i < args.length; i++) {
|
||||
if (i > columnoffset) {
|
||||
sb.append(" ");
|
||||
}
|
||||
sb.append(args[i]);
|
||||
}
|
||||
// Second argument is the table name.
|
||||
TableMapReduceUtil.initTableMapJob(args[1], sb.toString(),
|
||||
RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, c);
|
||||
c.setNumReduceTasks(0);
|
||||
// First arg is the output directory.
|
||||
FileOutputFormat.setOutputPath(c, new Path(args[0]));
|
||||
return c;
|
||||
}
|
||||
|
||||
static int printUsage() {
|
||||
System.out.println(NAME +
|
||||
" <outputdir> <tablename> <column1> [<column2>...]");
|
||||
return -1;
|
||||
}
|
||||
|
||||
public int run(final String[] args) throws Exception {
|
||||
// Make sure there are at least 3 parameters
|
||||
if (args.length < 3) {
|
||||
System.err.println("ERROR: Wrong number of parameters: " + args.length);
|
||||
return printUsage();
|
||||
}
|
||||
JobClient.runJob(createSubmittableJob(args));
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param args
|
||||
* @throws Exception
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
HBaseConfiguration c = new HBaseConfiguration();
|
||||
int errCode = ToolRunner.run(c, new RowCounter(), args);
|
||||
System.exit(errCode);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,6 @@
|
|||
|
||||
# ResourceBundle properties file for RowCounter MR job
|
||||
|
||||
CounterGroupName= RowCounter
|
||||
|
||||
ROWS.name= Rows
|
|
@ -0,0 +1,83 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.mapred.FileInputFormat;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.JobConfigurable;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Convert HBase tabular data into a format that is consumable by Map/Reduce.
|
||||
*/
|
||||
@Deprecated
|
||||
public class TableInputFormat extends TableInputFormatBase implements
|
||||
JobConfigurable {
|
||||
private final Log LOG = LogFactory.getLog(TableInputFormat.class);
|
||||
|
||||
/**
|
||||
* space delimited list of columns
|
||||
*/
|
||||
public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
|
||||
|
||||
public void configure(JobConf job) {
|
||||
Path[] tableNames = FileInputFormat.getInputPaths(job);
|
||||
String colArg = job.get(COLUMN_LIST);
|
||||
String[] colNames = colArg.split(" ");
|
||||
byte [][] m_cols = new byte[colNames.length][];
|
||||
for (int i = 0; i < m_cols.length; i++) {
|
||||
m_cols[i] = Bytes.toBytes(colNames[i]);
|
||||
}
|
||||
setInputColumns(m_cols);
|
||||
try {
|
||||
setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
|
||||
} catch (Exception e) {
|
||||
LOG.error(StringUtils.stringifyException(e));
|
||||
}
|
||||
}
|
||||
|
||||
public void validateInput(JobConf job) throws IOException {
|
||||
// expecting exactly one path
|
||||
Path [] tableNames = FileInputFormat.getInputPaths(job);
|
||||
if (tableNames == null || tableNames.length > 1) {
|
||||
throw new IOException("expecting one table name");
|
||||
}
|
||||
|
||||
// connected to table?
|
||||
if (getHTable() == null) {
|
||||
throw new IOException("could not connect to table '" +
|
||||
tableNames[0].getName() + "'");
|
||||
}
|
||||
|
||||
// expecting at least one column
|
||||
String colArg = job.get(COLUMN_LIST);
|
||||
if (colArg == null || colArg.length() == 0) {
|
||||
throw new IOException("expecting at least one column");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,347 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.mapred.InputFormat;
|
||||
import org.apache.hadoop.mapred.InputSplit;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.RecordReader;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
/**
|
||||
* A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a
|
||||
* byte[] of input columns and optionally a {@link Filter}.
|
||||
* Subclasses may use other TableRecordReader implementations.
|
||||
* <p>
|
||||
* An example of a subclass:
|
||||
* <pre>
|
||||
* class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
|
||||
*
|
||||
* public void configure(JobConf job) {
|
||||
* HTable exampleTable = new HTable(new HBaseConfiguration(job),
|
||||
* Bytes.toBytes("exampleTable"));
|
||||
* // mandatory
|
||||
* setHTable(exampleTable);
|
||||
* Text[] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
|
||||
* Bytes.toBytes("columnB") };
|
||||
* // mandatory
|
||||
* setInputColumns(inputColumns);
|
||||
* RowFilterInterface exampleFilter = new RegExpRowFilter("keyPrefix.*");
|
||||
* // optional
|
||||
* setRowFilter(exampleFilter);
|
||||
* }
|
||||
*
|
||||
* public void validateInput(JobConf job) throws IOException {
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
|
||||
@Deprecated
|
||||
public abstract class TableInputFormatBase
|
||||
implements InputFormat<ImmutableBytesWritable, Result> {
|
||||
final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
|
||||
private byte [][] inputColumns;
|
||||
private HTable table;
|
||||
private TableRecordReader tableRecordReader;
|
||||
private Filter rowFilter;
|
||||
|
||||
/**
|
||||
* Iterate over an HBase table data, return (Text, RowResult) pairs
|
||||
*/
|
||||
protected class TableRecordReader
|
||||
implements RecordReader<ImmutableBytesWritable, Result> {
|
||||
private byte [] startRow;
|
||||
private byte [] endRow;
|
||||
private byte [] lastRow;
|
||||
private Filter trrRowFilter;
|
||||
private ResultScanner scanner;
|
||||
private HTable htable;
|
||||
private byte [][] trrInputColumns;
|
||||
|
||||
/**
|
||||
* Restart from survivable exceptions by creating a new scanner.
|
||||
*
|
||||
* @param firstRow
|
||||
* @throws IOException
|
||||
*/
|
||||
public void restart(byte[] firstRow) throws IOException {
|
||||
if ((endRow != null) && (endRow.length > 0)) {
|
||||
if (trrRowFilter != null) {
|
||||
Scan scan = new Scan(firstRow, endRow);
|
||||
scan.addColumns(trrInputColumns);
|
||||
scan.setFilter(trrRowFilter);
|
||||
this.scanner = this.htable.getScanner(scan);
|
||||
} else {
|
||||
LOG.debug("TIFB.restart, firstRow: " +
|
||||
Bytes.toStringBinary(firstRow) + ", endRow: " +
|
||||
Bytes.toStringBinary(endRow));
|
||||
Scan scan = new Scan(firstRow, endRow);
|
||||
scan.addColumns(trrInputColumns);
|
||||
this.scanner = this.htable.getScanner(scan);
|
||||
}
|
||||
} else {
|
||||
LOG.debug("TIFB.restart, firstRow: " +
|
||||
Bytes.toStringBinary(firstRow) + ", no endRow");
|
||||
|
||||
Scan scan = new Scan(firstRow);
|
||||
scan.addColumns(trrInputColumns);
|
||||
// scan.setFilter(trrRowFilter);
|
||||
this.scanner = this.htable.getScanner(scan);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the scanner. Not done in constructor to allow for extension.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public void init() throws IOException {
|
||||
restart(startRow);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param htable the {@link HTable} to scan.
|
||||
*/
|
||||
public void setHTable(HTable htable) {
|
||||
this.htable = htable;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param inputColumns the columns to be placed in {@link Result}.
|
||||
*/
|
||||
public void setInputColumns(final byte [][] inputColumns) {
|
||||
this.trrInputColumns = inputColumns;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param startRow the first row in the split
|
||||
*/
|
||||
public void setStartRow(final byte [] startRow) {
|
||||
this.startRow = startRow;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param endRow the last row in the split
|
||||
*/
|
||||
public void setEndRow(final byte [] endRow) {
|
||||
this.endRow = endRow;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param rowFilter the {@link Filter} to be used.
|
||||
*/
|
||||
public void setRowFilter(Filter rowFilter) {
|
||||
this.trrRowFilter = rowFilter;
|
||||
}
|
||||
|
||||
public void close() {
|
||||
this.scanner.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return ImmutableBytesWritable
|
||||
*
|
||||
* @see org.apache.hadoop.mapred.RecordReader#createKey()
|
||||
*/
|
||||
public ImmutableBytesWritable createKey() {
|
||||
return new ImmutableBytesWritable();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return RowResult
|
||||
*
|
||||
* @see org.apache.hadoop.mapred.RecordReader#createValue()
|
||||
*/
|
||||
public Result createValue() {
|
||||
return new Result();
|
||||
}
|
||||
|
||||
public long getPos() {
|
||||
// This should be the ordinal tuple in the range;
|
||||
// not clear how to calculate...
|
||||
return 0;
|
||||
}
|
||||
|
||||
public float getProgress() {
|
||||
// Depends on the total number of tuples and getPos
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param key HStoreKey as input key.
|
||||
* @param value MapWritable as input value
|
||||
* @return true if there was more data
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean next(ImmutableBytesWritable key, Result value)
|
||||
throws IOException {
|
||||
Result result;
|
||||
try {
|
||||
result = this.scanner.next();
|
||||
} catch (UnknownScannerException e) {
|
||||
LOG.debug("recovered from " + StringUtils.stringifyException(e));
|
||||
restart(lastRow);
|
||||
this.scanner.next(); // skip presumed already mapped row
|
||||
result = this.scanner.next();
|
||||
}
|
||||
|
||||
if (result != null && result.size() > 0) {
|
||||
key.set(result.getRow());
|
||||
lastRow = key.get();
|
||||
Writables.copyWritable(result, value);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a TableRecordReader. If no TableRecordReader was provided, uses
|
||||
* the default.
|
||||
*
|
||||
* @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
|
||||
* JobConf, Reporter)
|
||||
*/
|
||||
public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
|
||||
InputSplit split, JobConf job, Reporter reporter)
|
||||
throws IOException {
|
||||
TableSplit tSplit = (TableSplit) split;
|
||||
TableRecordReader trr = this.tableRecordReader;
|
||||
// if no table record reader was provided use default
|
||||
if (trr == null) {
|
||||
trr = new TableRecordReader();
|
||||
}
|
||||
trr.setStartRow(tSplit.getStartRow());
|
||||
trr.setEndRow(tSplit.getEndRow());
|
||||
trr.setHTable(this.table);
|
||||
trr.setInputColumns(this.inputColumns);
|
||||
trr.setRowFilter(this.rowFilter);
|
||||
trr.init();
|
||||
return trr;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the splits that will serve as input for the map tasks.
|
||||
* <ul>
|
||||
* Splits are created in number equal to the smallest between numSplits and
|
||||
* the number of {@link HRegion}s in the table. If the number of splits is
|
||||
* smaller than the number of {@link HRegion}s then splits are spanned across
|
||||
* multiple {@link HRegion}s and are grouped the most evenly possible. In the
|
||||
* case splits are uneven the bigger splits are placed first in the
|
||||
* {@link InputSplit} array.
|
||||
*
|
||||
* @param job the map task {@link JobConf}
|
||||
* @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
|
||||
*
|
||||
* @return the input splits
|
||||
*
|
||||
* @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
|
||||
*/
|
||||
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
|
||||
if (this.table == null) {
|
||||
throw new IOException("No table was provided");
|
||||
}
|
||||
byte [][] startKeys = this.table.getStartKeys();
|
||||
if (startKeys == null || startKeys.length == 0) {
|
||||
throw new IOException("Expecting at least one region");
|
||||
}
|
||||
if (this.inputColumns == null || this.inputColumns.length == 0) {
|
||||
throw new IOException("Expecting at least one column");
|
||||
}
|
||||
int realNumSplits = numSplits > startKeys.length? startKeys.length:
|
||||
numSplits;
|
||||
InputSplit[] splits = new InputSplit[realNumSplits];
|
||||
int middle = startKeys.length / realNumSplits;
|
||||
int startPos = 0;
|
||||
for (int i = 0; i < realNumSplits; i++) {
|
||||
int lastPos = startPos + middle;
|
||||
lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
|
||||
String regionLocation = table.getRegionLocation(startKeys[startPos]).
|
||||
getServerAddress().getHostname();
|
||||
splits[i] = new TableSplit(this.table.getTableName(),
|
||||
startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
|
||||
HConstants.EMPTY_START_ROW, regionLocation);
|
||||
LOG.info("split: " + i + "->" + splits[i]);
|
||||
startPos = lastPos;
|
||||
}
|
||||
return splits;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param inputColumns to be passed in {@link Result} to the map task.
|
||||
*/
|
||||
protected void setInputColumns(byte [][] inputColumns) {
|
||||
this.inputColumns = inputColumns;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows subclasses to get the {@link HTable}.
|
||||
*/
|
||||
protected HTable getHTable() {
|
||||
return this.table;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows subclasses to set the {@link HTable}.
|
||||
*
|
||||
* @param table to get the data from
|
||||
*/
|
||||
protected void setHTable(HTable table) {
|
||||
this.table = table;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows subclasses to set the {@link TableRecordReader}.
|
||||
*
|
||||
* @param tableRecordReader
|
||||
* to provide other {@link TableRecordReader} implementations.
|
||||
*/
|
||||
protected void setTableRecordReader(TableRecordReader tableRecordReader) {
|
||||
this.tableRecordReader = tableRecordReader;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows subclasses to set the {@link Filter} to be used.
|
||||
*
|
||||
* @param rowFilter
|
||||
*/
|
||||
protected void setRowFilter(Filter rowFilter) {
|
||||
this.rowFilter = rowFilter;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
import org.apache.hadoop.mapred.Mapper;
|
||||
|
||||
/**
|
||||
* Scan an HBase table to sort by a specified sort column.
|
||||
* If the column does not exist, the record is not passed to Reduce.
|
||||
*
|
||||
* @param <K> WritableComparable key class
|
||||
* @param <V> Writable value class
|
||||
*/
|
||||
@Deprecated
|
||||
public interface TableMap<K extends WritableComparable<? super K>, V extends Writable>
|
||||
extends Mapper<ImmutableBytesWritable, Result, K, V> {
|
||||
|
||||
}
|
|
@ -0,0 +1,184 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
import org.apache.hadoop.mapred.FileInputFormat;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
|
||||
/**
|
||||
* Utility for {@link TableMap} and {@link TableReduce}
|
||||
*/
|
||||
@Deprecated
|
||||
@SuppressWarnings("unchecked")
|
||||
public class TableMapReduceUtil {
|
||||
|
||||
/**
|
||||
* Use this before submitting a TableMap job. It will
|
||||
* appropriately set up the JobConf.
|
||||
*
|
||||
* @param table The table name to read from.
|
||||
* @param columns The columns to scan.
|
||||
* @param mapper The mapper class to use.
|
||||
* @param outputKeyClass The class of the output key.
|
||||
* @param outputValueClass The class of the output value.
|
||||
* @param job The current job configuration to adjust.
|
||||
*/
|
||||
public static void initTableMapJob(String table, String columns,
|
||||
Class<? extends TableMap> mapper,
|
||||
Class<? extends WritableComparable> outputKeyClass,
|
||||
Class<? extends Writable> outputValueClass, JobConf job) {
|
||||
|
||||
job.setInputFormat(TableInputFormat.class);
|
||||
job.setMapOutputValueClass(outputValueClass);
|
||||
job.setMapOutputKeyClass(outputKeyClass);
|
||||
job.setMapperClass(mapper);
|
||||
FileInputFormat.addInputPaths(job, table);
|
||||
job.set(TableInputFormat.COLUMN_LIST, columns);
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this before submitting a TableReduce job. It will
|
||||
* appropriately set up the JobConf.
|
||||
*
|
||||
* @param table The output table.
|
||||
* @param reducer The reducer class to use.
|
||||
* @param job The current job configuration to adjust.
|
||||
* @throws IOException When determining the region count fails.
|
||||
*/
|
||||
public static void initTableReduceJob(String table,
|
||||
Class<? extends TableReduce> reducer, JobConf job)
|
||||
throws IOException {
|
||||
initTableReduceJob(table, reducer, job, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this before submitting a TableReduce job. It will
|
||||
* appropriately set up the JobConf.
|
||||
*
|
||||
* @param table The output table.
|
||||
* @param reducer The reducer class to use.
|
||||
* @param job The current job configuration to adjust.
|
||||
* @param partitioner Partitioner to use. Pass <code>null</code> to use
|
||||
* default partitioner.
|
||||
* @throws IOException When determining the region count fails.
|
||||
*/
|
||||
public static void initTableReduceJob(String table,
|
||||
Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
|
||||
throws IOException {
|
||||
job.setOutputFormat(TableOutputFormat.class);
|
||||
job.setReducerClass(reducer);
|
||||
job.set(TableOutputFormat.OUTPUT_TABLE, table);
|
||||
job.setOutputKeyClass(ImmutableBytesWritable.class);
|
||||
job.setOutputValueClass(Put.class);
|
||||
if (partitioner == HRegionPartitioner.class) {
|
||||
job.setPartitionerClass(HRegionPartitioner.class);
|
||||
HTable outputTable = new HTable(new HBaseConfiguration(job), table);
|
||||
int regions = outputTable.getRegionsInfo().size();
|
||||
if (job.getNumReduceTasks() > regions) {
|
||||
job.setNumReduceTasks(outputTable.getRegionsInfo().size());
|
||||
}
|
||||
} else if (partitioner != null) {
|
||||
job.setPartitionerClass(partitioner);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensures that the given number of reduce tasks for the given job
|
||||
* configuration does not exceed the number of regions for the given table.
|
||||
*
|
||||
* @param table The table to get the region count for.
|
||||
* @param job The current job configuration to adjust.
|
||||
* @throws IOException When retrieving the table details fails.
|
||||
*/
|
||||
public static void limitNumReduceTasks(String table, JobConf job)
|
||||
throws IOException {
|
||||
HTable outputTable = new HTable(new HBaseConfiguration(job), table);
|
||||
int regions = outputTable.getRegionsInfo().size();
|
||||
if (job.getNumReduceTasks() > regions)
|
||||
job.setNumReduceTasks(regions);
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensures that the given number of map tasks for the given job
|
||||
* configuration does not exceed the number of regions for the given table.
|
||||
*
|
||||
* @param table The table to get the region count for.
|
||||
* @param job The current job configuration to adjust.
|
||||
* @throws IOException When retrieving the table details fails.
|
||||
*/
|
||||
public static void limitNumMapTasks(String table, JobConf job)
|
||||
throws IOException {
|
||||
HTable outputTable = new HTable(new HBaseConfiguration(job), table);
|
||||
int regions = outputTable.getRegionsInfo().size();
|
||||
if (job.getNumMapTasks() > regions)
|
||||
job.setNumMapTasks(regions);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the number of reduce tasks for the given job configuration to the
|
||||
* number of regions the given table has.
|
||||
*
|
||||
* @param table The table to get the region count for.
|
||||
* @param job The current job configuration to adjust.
|
||||
* @throws IOException When retrieving the table details fails.
|
||||
*/
|
||||
public static void setNumReduceTasks(String table, JobConf job)
|
||||
throws IOException {
|
||||
HTable outputTable = new HTable(new HBaseConfiguration(job), table);
|
||||
int regions = outputTable.getRegionsInfo().size();
|
||||
job.setNumReduceTasks(regions);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the number of map tasks for the given job configuration to the
|
||||
* number of regions the given table has.
|
||||
*
|
||||
* @param table The table to get the region count for.
|
||||
* @param job The current job configuration to adjust.
|
||||
* @throws IOException When retrieving the table details fails.
|
||||
*/
|
||||
public static void setNumMapTasks(String table, JobConf job)
|
||||
throws IOException {
|
||||
HTable outputTable = new HTable(new HBaseConfiguration(job), table);
|
||||
int regions = outputTable.getRegionsInfo().size();
|
||||
job.setNumMapTasks(regions);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the number of rows to return and cache with each scanner iteration.
|
||||
* Higher caching values will enable faster mapreduce jobs at the expense of
|
||||
* requiring more heap to contain the cached rows.
|
||||
*
|
||||
* @param job The current job configuration to adjust.
|
||||
* @param batchSize The number of rows to return in batch with each scanner
|
||||
* iteration.
|
||||
*/
|
||||
public static void setScannerCaching(JobConf job, int batchSize) {
|
||||
job.setInt("hbase.client.scanner.caching", batchSize);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,106 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.mapred.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.mapred.InvalidJobConfException;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.FileOutputFormat;
|
||||
import org.apache.hadoop.mapred.RecordWriter;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
/**
|
||||
* Convert Map/Reduce output and write it to an HBase table
|
||||
*/
|
||||
@Deprecated
|
||||
public class TableOutputFormat extends
|
||||
FileOutputFormat<ImmutableBytesWritable, Put> {
|
||||
|
||||
/** JobConf parameter that specifies the output table */
|
||||
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
|
||||
private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
|
||||
|
||||
/**
|
||||
* Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
|
||||
* and write to an HBase table
|
||||
*/
|
||||
protected static class TableRecordWriter
|
||||
implements RecordWriter<ImmutableBytesWritable, Put> {
|
||||
private HTable m_table;
|
||||
|
||||
/**
|
||||
* Instantiate a TableRecordWriter with the HBase HClient for writing.
|
||||
*
|
||||
* @param table
|
||||
*/
|
||||
public TableRecordWriter(HTable table) {
|
||||
m_table = table;
|
||||
}
|
||||
|
||||
public void close(Reporter reporter)
|
||||
throws IOException {
|
||||
m_table.flushCommits();
|
||||
}
|
||||
|
||||
public void write(ImmutableBytesWritable key,
|
||||
Put value) throws IOException {
|
||||
m_table.put(new Put(value));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public RecordWriter getRecordWriter(FileSystem ignored,
|
||||
JobConf job, String name, Progressable progress) throws IOException {
|
||||
|
||||
// expecting exactly one path
|
||||
|
||||
String tableName = job.get(OUTPUT_TABLE);
|
||||
HTable table = null;
|
||||
try {
|
||||
table = new HTable(new HBaseConfiguration(job), tableName);
|
||||
} catch(IOException e) {
|
||||
LOG.error(e);
|
||||
throw e;
|
||||
}
|
||||
table.setAutoFlush(false);
|
||||
return new TableRecordWriter(table);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkOutputSpecs(FileSystem ignored, JobConf job)
|
||||
throws FileAlreadyExistsException, InvalidJobConfException, IOException {
|
||||
|
||||
String tableName = job.get(OUTPUT_TABLE);
|
||||
if(tableName == null) {
|
||||
throw new IOException("Must specify table name");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
import org.apache.hadoop.mapred.Reducer;
|
||||
|
||||
/**
|
||||
* Write a table, sorting by the input key
|
||||
*
|
||||
* @param <K> key class
|
||||
* @param <V> value class
|
||||
*/
|
||||
@Deprecated
|
||||
@SuppressWarnings("unchecked")
|
||||
public interface TableReduce<K extends WritableComparable, V extends Writable>
|
||||
extends Reducer<K, V, ImmutableBytesWritable, Put> {
|
||||
|
||||
}
|
|
@ -0,0 +1,113 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.mapred.InputSplit;
|
||||
|
||||
/**
|
||||
* A table split corresponds to a key range [low, high)
|
||||
*/
|
||||
@Deprecated
|
||||
public class TableSplit implements InputSplit, Comparable<TableSplit> {
|
||||
private byte [] m_tableName;
|
||||
private byte [] m_startRow;
|
||||
private byte [] m_endRow;
|
||||
private String m_regionLocation;
|
||||
|
||||
/** default constructor */
|
||||
public TableSplit() {
|
||||
this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
|
||||
HConstants.EMPTY_BYTE_ARRAY, "");
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param tableName
|
||||
* @param startRow
|
||||
* @param endRow
|
||||
* @param location
|
||||
*/
|
||||
public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow,
|
||||
final String location) {
|
||||
this.m_tableName = tableName;
|
||||
this.m_startRow = startRow;
|
||||
this.m_endRow = endRow;
|
||||
this.m_regionLocation = location;
|
||||
}
|
||||
|
||||
/** @return table name */
|
||||
public byte [] getTableName() {
|
||||
return this.m_tableName;
|
||||
}
|
||||
|
||||
/** @return starting row key */
|
||||
public byte [] getStartRow() {
|
||||
return this.m_startRow;
|
||||
}
|
||||
|
||||
/** @return end row key */
|
||||
public byte [] getEndRow() {
|
||||
return this.m_endRow;
|
||||
}
|
||||
|
||||
/** @return the region's hostname */
|
||||
public String getRegionLocation() {
|
||||
return this.m_regionLocation;
|
||||
}
|
||||
|
||||
public String[] getLocations() {
|
||||
return new String[] {this.m_regionLocation};
|
||||
}
|
||||
|
||||
public long getLength() {
|
||||
// Not clear how to obtain this... seems to be used only for sorting splits
|
||||
return 0;
|
||||
}
|
||||
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
this.m_tableName = Bytes.readByteArray(in);
|
||||
this.m_startRow = Bytes.readByteArray(in);
|
||||
this.m_endRow = Bytes.readByteArray(in);
|
||||
this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
|
||||
}
|
||||
|
||||
public void write(DataOutput out) throws IOException {
|
||||
Bytes.writeByteArray(out, this.m_tableName);
|
||||
Bytes.writeByteArray(out, this.m_startRow);
|
||||
Bytes.writeByteArray(out, this.m_endRow);
|
||||
Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return m_regionLocation + ":" +
|
||||
Bytes.toStringBinary(m_startRow) + "," + Bytes.toStringBinary(m_endRow);
|
||||
}
|
||||
|
||||
public int compareTo(TableSplit o) {
|
||||
return Bytes.compareTo(getStartRow(), o.getStartRow());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,128 @@
|
|||
/*
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
/**
|
||||
Provides HBase <a href="http://wiki.apache.org/hadoop/HadoopMapReduce">MapReduce</a>
|
||||
Input/OutputFormats, a table indexing MapReduce job, and utility
|
||||
|
||||
<h2>Table of Contents</h2>
|
||||
<ul>
|
||||
<li><a href="#classpath">HBase, MapReduce and the CLASSPATH</a></li>
|
||||
<li><a href="#sink">HBase as MapReduce job data source and sink</a></li>
|
||||
<li><a href="#examples">Example Code</a></li>
|
||||
</ul>
|
||||
|
||||
<h2><a name="classpath">HBase, MapReduce and the CLASSPATH</a></h2>
|
||||
|
||||
<p>MapReduce jobs deployed to a MapReduce cluster do not by default have access
|
||||
to the HBase configuration under <code>$HBASE_CONF_DIR</code> nor to HBase classes.
|
||||
You could add <code>hbase-site.xml</code> to $HADOOP_HOME/conf and add
|
||||
<code>hbase-X.X.X.jar</code> to the <code>$HADOOP_HOME/lib</code> and copy these
|
||||
changes across your cluster but the cleanest means of adding hbase configuration
|
||||
and classes to the cluster <code>CLASSPATH</code> is by uncommenting
|
||||
<code>HADOOP_CLASSPATH</code> in <code>$HADOOP_HOME/conf/hadoop-env.sh</code>
|
||||
and adding the path to the hbase jar and <code>$HBASE_CONF_DIR</code> directory.
|
||||
Then copy the amended configuration around the cluster.
|
||||
You'll probably need to restart the MapReduce cluster if you want it to notice
|
||||
the new configuration.
|
||||
</p>
|
||||
|
||||
<p>For example, here is how you would amend <code>hadoop-env.sh</code> adding the
|
||||
built hbase jar, hbase conf, and the <code>PerformanceEvaluation</code> class from
|
||||
the built hbase test jar to the hadoop <code>CLASSPATH<code>:
|
||||
|
||||
<blockquote><pre># Extra Java CLASSPATH elements. Optional.
|
||||
# export HADOOP_CLASSPATH=
|
||||
export HADOOP_CLASSPATH=$HBASE_HOME/build/test:$HBASE_HOME/build/hbase-X.X.X.jar:$HBASE_HOME/build/hbase-X.X.X-test.jar:$HBASE_HOME/conf</pre></blockquote>
|
||||
|
||||
<p>Expand <code>$HBASE_HOME</code> in the above appropriately to suit your
|
||||
local environment.</p>
|
||||
|
||||
<p>After copying the above change around your cluster, this is how you would run
|
||||
the PerformanceEvaluation MR job to put up 4 clients (Presumes a ready mapreduce
|
||||
cluster):
|
||||
|
||||
<blockquote><pre>$HADOOP_HOME/bin/hadoop org.apache.hadoop.hbase.PerformanceEvaluation sequentialWrite 4</pre></blockquote>
|
||||
|
||||
The PerformanceEvaluation class wil be found on the CLASSPATH because you
|
||||
added <code>$HBASE_HOME/build/test</code> to HADOOP_CLASSPATH
|
||||
</p>
|
||||
|
||||
<p>Another possibility, if for example you do not have access to hadoop-env.sh or
|
||||
are unable to restart the hadoop cluster, is bundling the hbase jar into a mapreduce
|
||||
job jar adding it and its dependencies under the job jar <code>lib/</code>
|
||||
directory and the hbase conf into a job jar <code>conf/</code> directory.
|
||||
</a>
|
||||
|
||||
<h2><a name="sink">HBase as MapReduce job data source and sink</a></h2>
|
||||
|
||||
<p>HBase can be used as a data source, {@link org.apache.hadoop.hbase.mapred.TableInputFormat TableInputFormat},
|
||||
and data sink, {@link org.apache.hadoop.hbase.mapred.TableOutputFormat TableOutputFormat}, for MapReduce jobs.
|
||||
Writing MapReduce jobs that read or write HBase, you'll probably want to subclass
|
||||
{@link org.apache.hadoop.hbase.mapred.TableMap TableMap} and/or
|
||||
{@link org.apache.hadoop.hbase.mapred.TableReduce TableReduce}. See the do-nothing
|
||||
pass-through classes {@link org.apache.hadoop.hbase.mapred.IdentityTableMap IdentityTableMap} and
|
||||
{@link org.apache.hadoop.hbase.mapred.IdentityTableReduce IdentityTableReduce} for basic usage. For a more
|
||||
involved example, see {@link org.apache.hadoop.hbase.mapred.BuildTableIndex BuildTableIndex}
|
||||
or review the <code>org.apache.hadoop.hbase.mapred.TestTableMapReduce</code> unit test.
|
||||
</p>
|
||||
|
||||
<p>Running mapreduce jobs that have hbase as source or sink, you'll need to
|
||||
specify source/sink table and column names in your configuration.</p>
|
||||
|
||||
<p>Reading from hbase, the TableInputFormat asks hbase for the list of
|
||||
regions and makes a map-per-region or <code>mapred.map.tasks maps</code>,
|
||||
whichever is smaller (If your job only has two maps, up mapred.map.tasks
|
||||
to a number > number of regions). Maps will run on the adjacent TaskTracker
|
||||
if you are running a TaskTracer and RegionServer per node.
|
||||
Writing, it may make sense to avoid the reduce step and write yourself back into
|
||||
hbase from inside your map. You'd do this when your job does not need the sort
|
||||
and collation that mapreduce does on the map emitted data; on insert,
|
||||
hbase 'sorts' so there is no point double-sorting (and shuffling data around
|
||||
your mapreduce cluster) unless you need to. If you do not need the reduce,
|
||||
you might just have your map emit counts of records processed just so the
|
||||
framework's report at the end of your job has meaning or set the number of
|
||||
reduces to zero and use TableOutputFormat. See example code
|
||||
below. If running the reduce step makes sense in your case, its usually better
|
||||
to have lots of reducers so load is spread across the hbase cluster.</p>
|
||||
|
||||
<p>There is also a new hbase partitioner that will run as many reducers as
|
||||
currently existing regions. The
|
||||
{@link org.apache.hadoop.hbase.mapred.HRegionPartitioner} is suitable
|
||||
when your table is large and your upload is not such that it will greatly
|
||||
alter the number of existing regions when done; other use the default
|
||||
partitioner.
|
||||
</p>
|
||||
|
||||
<h2><a name="examples">Example Code</a></h2>
|
||||
<h3>Sample Row Counter</h3>
|
||||
<p>See {@link org.apache.hadoop.hbase.mapred.RowCounter}. You should be able to run
|
||||
it by doing: <code>% ./bin/hadoop jar hbase-X.X.X.jar</code>. This will invoke
|
||||
the hbase MapReduce Driver class. Select 'rowcounter' from the choice of jobs
|
||||
offered. You may need to add the hbase conf directory to <code>$HADOOP_HOME/conf/hadoop-env.sh#HADOOP_CLASSPATH</code>
|
||||
so the rowcounter gets pointed at the right hbase cluster (or, build a new jar
|
||||
with an appropriate hbase-site.xml built into your job jar).
|
||||
</p>
|
||||
<h3>PerformanceEvaluation</h3>
|
||||
<p>See org.apache.hadoop.hbase.PerformanceEvaluation from hbase src/test. It runs
|
||||
a mapreduce job to run concurrent clients reading and writing hbase.
|
||||
</p>
|
||||
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
|
@ -0,0 +1,244 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.mapred.JobClient;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.MapReduceBase;
|
||||
import org.apache.hadoop.mapred.MiniMRCluster;
|
||||
import org.apache.hadoop.mapred.OutputCollector;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
|
||||
/**
|
||||
* Test Map/Reduce job over HBase tables. The map/reduce process we're testing
|
||||
* on our tables is simple - take every row in the table, reverse the value of
|
||||
* a particular cell, and write it back to the table.
|
||||
*/
|
||||
public class TestTableMapReduce extends MultiRegionTable {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(TestTableMapReduce.class.getName());
|
||||
|
||||
static final String MULTI_REGION_TABLE_NAME = "mrtest";
|
||||
static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
|
||||
static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
|
||||
|
||||
private static final byte [][] columns = new byte [][] {
|
||||
INPUT_FAMILY,
|
||||
OUTPUT_FAMILY
|
||||
};
|
||||
|
||||
/** constructor */
|
||||
public TestTableMapReduce() {
|
||||
super(Bytes.toString(INPUT_FAMILY));
|
||||
desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
|
||||
desc.addFamily(new HColumnDescriptor(INPUT_FAMILY));
|
||||
desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY));
|
||||
}
|
||||
|
||||
/**
|
||||
* Pass the given key and processed record reduce
|
||||
*/
|
||||
public static class ProcessContentsMapper
|
||||
extends MapReduceBase
|
||||
implements TableMap<ImmutableBytesWritable, Put> {
|
||||
/**
|
||||
* Pass the key, and reversed value to reduce
|
||||
* @param key
|
||||
* @param value
|
||||
* @param output
|
||||
* @param reporter
|
||||
* @throws IOException
|
||||
*/
|
||||
public void map(ImmutableBytesWritable key, Result value,
|
||||
OutputCollector<ImmutableBytesWritable, Put> output,
|
||||
Reporter reporter)
|
||||
throws IOException {
|
||||
if (value.size() != 1) {
|
||||
throw new IOException("There should only be one input column");
|
||||
}
|
||||
Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
|
||||
cf = value.getMap();
|
||||
if(!cf.containsKey(INPUT_FAMILY)) {
|
||||
throw new IOException("Wrong input columns. Missing: '" +
|
||||
Bytes.toString(INPUT_FAMILY) + "'.");
|
||||
}
|
||||
|
||||
// Get the original value and reverse it
|
||||
|
||||
String originalValue = new String(value.getValue(INPUT_FAMILY, null),
|
||||
HConstants.UTF8_ENCODING);
|
||||
StringBuilder newValue = new StringBuilder(originalValue);
|
||||
newValue.reverse();
|
||||
|
||||
// Now set the value to be collected
|
||||
|
||||
Put outval = new Put(key.get());
|
||||
outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
|
||||
output.collect(key, outval);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a map/reduce against a multi-region table
|
||||
* @throws IOException
|
||||
*/
|
||||
public void testMultiRegionTable() throws IOException {
|
||||
runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME));
|
||||
}
|
||||
|
||||
private void runTestOnTable(HTable table) throws IOException {
|
||||
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
|
||||
|
||||
JobConf jobConf = null;
|
||||
try {
|
||||
LOG.info("Before map/reduce startup");
|
||||
jobConf = new JobConf(conf, TestTableMapReduce.class);
|
||||
jobConf.setJobName("process column contents");
|
||||
jobConf.setNumReduceTasks(1);
|
||||
TableMapReduceUtil.initTableMapJob(Bytes.toString(table.getTableName()),
|
||||
Bytes.toString(INPUT_FAMILY), ProcessContentsMapper.class,
|
||||
ImmutableBytesWritable.class, Put.class, jobConf);
|
||||
TableMapReduceUtil.initTableReduceJob(Bytes.toString(table.getTableName()),
|
||||
IdentityTableReduce.class, jobConf);
|
||||
|
||||
LOG.info("Started " + Bytes.toString(table.getTableName()));
|
||||
JobClient.runJob(jobConf);
|
||||
LOG.info("After map/reduce completion");
|
||||
|
||||
// verify map-reduce results
|
||||
verify(Bytes.toString(table.getTableName()));
|
||||
} finally {
|
||||
mrCluster.shutdown();
|
||||
if (jobConf != null) {
|
||||
FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void verify(String tableName) throws IOException {
|
||||
HTable table = new HTable(conf, tableName);
|
||||
boolean verified = false;
|
||||
long pause = conf.getLong("hbase.client.pause", 5 * 1000);
|
||||
int numRetries = conf.getInt("hbase.client.retries.number", 5);
|
||||
for (int i = 0; i < numRetries; i++) {
|
||||
try {
|
||||
LOG.info("Verification attempt #" + i);
|
||||
verifyAttempt(table);
|
||||
verified = true;
|
||||
break;
|
||||
} catch (NullPointerException e) {
|
||||
// If here, a cell was empty. Presume its because updates came in
|
||||
// after the scanner had been opened. Wait a while and retry.
|
||||
LOG.debug("Verification attempt failed: " + e.getMessage());
|
||||
}
|
||||
try {
|
||||
Thread.sleep(pause);
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
assertTrue(verified);
|
||||
}
|
||||
|
||||
/**
|
||||
* Looks at every value of the mapreduce output and verifies that indeed
|
||||
* the values have been reversed.
|
||||
* @param table Table to scan.
|
||||
* @throws IOException
|
||||
* @throws NullPointerException if we failed to find a cell value
|
||||
*/
|
||||
private void verifyAttempt(final HTable table) throws IOException, NullPointerException {
|
||||
Scan scan = new Scan();
|
||||
scan.addColumns(columns);
|
||||
ResultScanner scanner = table.getScanner(scan);
|
||||
try {
|
||||
for (Result r : scanner) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (r.size() > 2 ) {
|
||||
throw new IOException("Too many results, expected 2 got " +
|
||||
r.size());
|
||||
}
|
||||
}
|
||||
byte[] firstValue = null;
|
||||
byte[] secondValue = null;
|
||||
int count = 0;
|
||||
for(KeyValue kv : r.list()) {
|
||||
if (count == 0) {
|
||||
firstValue = kv.getValue();
|
||||
}
|
||||
if (count == 1) {
|
||||
secondValue = kv.getValue();
|
||||
}
|
||||
count++;
|
||||
if (count == 2) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
String first = "";
|
||||
if (firstValue == null) {
|
||||
throw new NullPointerException(Bytes.toString(r.getRow()) +
|
||||
": first value is null");
|
||||
}
|
||||
first = new String(firstValue, HConstants.UTF8_ENCODING);
|
||||
|
||||
String second = "";
|
||||
if (secondValue == null) {
|
||||
throw new NullPointerException(Bytes.toString(r.getRow()) +
|
||||
": second value is null");
|
||||
}
|
||||
byte[] secondReversed = new byte[secondValue.length];
|
||||
for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
|
||||
secondReversed[i] = secondValue[j];
|
||||
}
|
||||
second = new String(secondReversed, HConstants.UTF8_ENCODING);
|
||||
|
||||
if (first.compareTo(second) != 0) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("second key is not the reverse of first. row=" +
|
||||
r.getRow() + ", first value=" + first + ", second value=" +
|
||||
second);
|
||||
}
|
||||
fail();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
scanner.close();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue