HBASE-5259 Normalize the RegionLocation in TableInputFormat by the reverse DNS lookup (Liyin Tang)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1238774 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
650b321396
commit
b246e68435
|
@ -20,11 +20,17 @@
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import javax.naming.NamingException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HServerAddress;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
@ -36,6 +42,7 @@ import org.apache.hadoop.mapreduce.InputSplit;
|
||||||
import org.apache.hadoop.mapreduce.JobContext;
|
import org.apache.hadoop.mapreduce.JobContext;
|
||||||
import org.apache.hadoop.mapreduce.RecordReader;
|
import org.apache.hadoop.mapreduce.RecordReader;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
import org.apache.hadoop.net.DNS;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A base for {@link TableInputFormat}s. Receives a {@link HTable}, an
|
* A base for {@link TableInputFormat}s. Receives a {@link HTable}, an
|
||||||
|
@ -78,6 +85,13 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
||||||
private TableRecordReader tableRecordReader = null;
|
private TableRecordReader tableRecordReader = null;
|
||||||
|
|
||||||
|
|
||||||
|
/** The reverse DNS lookup cache mapping: IPAddress => HostName */
|
||||||
|
private HashMap<InetAddress, String> reverseDNSCacheMap =
|
||||||
|
new HashMap<InetAddress, String>();
|
||||||
|
|
||||||
|
/** The NameServer address */
|
||||||
|
private String nameServer = null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Builds a TableRecordReader. If no TableRecordReader was provided, uses
|
* Builds a TableRecordReader. If no TableRecordReader was provided, uses
|
||||||
* the default.
|
* the default.
|
||||||
|
@ -128,6 +142,10 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
||||||
if (table == null) {
|
if (table == null) {
|
||||||
throw new IOException("No table was provided.");
|
throw new IOException("No table was provided.");
|
||||||
}
|
}
|
||||||
|
// Get the name server address and the default value is null.
|
||||||
|
this.nameServer =
|
||||||
|
context.getConfiguration().get("hbase.nameserver.address", null);
|
||||||
|
|
||||||
Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
|
Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
|
||||||
if (keys == null || keys.getFirst() == null ||
|
if (keys == null || keys.getFirst() == null ||
|
||||||
keys.getFirst().length == 0) {
|
keys.getFirst().length == 0) {
|
||||||
|
@ -138,13 +156,24 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
||||||
if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
|
if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
String regionLocation = table.getRegionLocation(keys.getFirst()[i]).
|
HServerAddress regionServerAddress =
|
||||||
getHostname();
|
table.getRegionLocation(keys.getFirst()[i]).getServerAddress();
|
||||||
byte[] startRow = scan.getStartRow();
|
InetAddress regionAddress =
|
||||||
byte[] stopRow = scan.getStopRow();
|
regionServerAddress.getInetSocketAddress().getAddress();
|
||||||
// determine if the given start an stop key fall into the region
|
String regionLocation;
|
||||||
|
try {
|
||||||
|
regionLocation = reverseDNS(regionAddress);
|
||||||
|
} catch (NamingException e) {
|
||||||
|
LOG.error("Cannot resolve the host name for " + regionAddress +
|
||||||
|
" because of " + e);
|
||||||
|
regionLocation = regionServerAddress.getHostname();
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] startRow = scan.getStartRow();
|
||||||
|
byte[] stopRow = scan.getStopRow();
|
||||||
|
// determine if the given start an stop key fall into the region
|
||||||
if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
|
if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
|
||||||
Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
|
Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
|
||||||
(stopRow.length == 0 ||
|
(stopRow.length == 0 ||
|
||||||
Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
|
Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
|
||||||
byte[] splitStart = startRow.length == 0 ||
|
byte[] splitStart = startRow.length == 0 ||
|
||||||
|
@ -164,6 +193,15 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
||||||
}
|
}
|
||||||
return splits;
|
return splits;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String reverseDNS(InetAddress ipAddress) throws NamingException {
|
||||||
|
String hostName = this.reverseDNSCacheMap.get(ipAddress);
|
||||||
|
if (hostName == null) {
|
||||||
|
hostName = DNS.reverseDns(ipAddress, this.nameServer);
|
||||||
|
this.reverseDNSCacheMap.put(ipAddress, hostName);
|
||||||
|
}
|
||||||
|
return hostName;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
|
Loading…
Reference in New Issue