HBASE-18640 Move mapreduce out of hbase-server into separate module.
- Moves out o.a.h.h.{mapred, mapreduce} to new hbase-mapreduce module which depends on hbase-server because of classes like *Snapshot{Input,Output}Format.java, WALs, replication, etc - hbase-backup depends on it for WALPlayer and MR job stuff - A bunch of tools needed to be pulled into hbase-mapreduce becuase of their dependencies on MR. These are: CompactionTool, LoadTestTool, PerformanceEvaluation, ExportSnapshot This is better place of them than hbase-server. But ideal place would be in separate hbase-tools module. - There were some tests in hbase-server which were digging into these tools for static util funtions or confs. Moved these to better/easily shared place. For eg. security related stuff to HBaseKerberosUtils. - Note that hbase-mapreduce has secondPartExecution tests. On my machine they took like 20 min, so maybe more on apache jenkins. That's basically equal reduction of runtime of hbase-server tests, which is a big win! Change-Id: Ieeb7235014717ca83ee5cb13b2a27fddfa6838e8
This commit is contained in:
parent
8d33949b8d
commit
664b6be0ef
|
@ -195,6 +195,10 @@
|
|||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-server</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-mapreduce</artifactId>
|
||||
</dependency>
|
||||
<!-- To dump tools in hbase-procedure into cached_classpath.txt. -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
|
|
|
@ -50,6 +50,7 @@
|
|||
<include>org.apache.hbase:hbase-thrift</include>
|
||||
<include>org.apache.hbase:hbase-external-blockcache</include>
|
||||
<include>org.apache.hbase:hbase-backup</include>
|
||||
<include>org.apache.hbase:hbase-mapreduce</include>
|
||||
</includes>
|
||||
<!-- Binaries for the dependencies also go in the hbase-jars directory -->
|
||||
<binaries>
|
||||
|
|
|
@ -62,6 +62,7 @@
|
|||
<include>org.apache.hbase:hbase-testing-util</include>
|
||||
<include>org.apache.hbase:hbase-thrift</include>
|
||||
<include>org.apache.hbase:hbase-backup</include>
|
||||
<include>org.apache.hbase:hbase-mapreduce</include>
|
||||
</includes>
|
||||
<!-- Include all the sources in the top directory -->
|
||||
<sources>
|
||||
|
|
|
@ -107,6 +107,16 @@
|
|||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-server</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-mapreduce</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-mapreduce</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-common</artifactId>
|
||||
|
|
|
@ -144,6 +144,10 @@
|
|||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-server</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-mapreduce</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-endpoint</artifactId>
|
||||
|
|
|
@ -198,6 +198,22 @@
|
|||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-client</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-mapreduce</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-mapreduce</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-mapreduce</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-rsgroup</artifactId>
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.HFileTestUtil;
|
||||
import org.apache.hadoop.hbase.util.LoadTestTool;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
@ -70,7 +71,7 @@ public class IntegrationTestIngest extends IntegrationTestBase {
|
|||
protected String[] LOAD_TEST_TOOL_INIT_ARGS = {
|
||||
LoadTestTool.OPT_COLUMN_FAMILIES,
|
||||
LoadTestTool.OPT_COMPRESSION,
|
||||
LoadTestTool.OPT_DATA_BLOCK_ENCODING,
|
||||
HFileTestUtil.OPT_DATA_BLOCK_ENCODING,
|
||||
LoadTestTool.OPT_INMEMORY,
|
||||
LoadTestTool.OPT_ENCRYPTION,
|
||||
LoadTestTool.OPT_NUM_REGIONS_PER_SERVER,
|
||||
|
@ -138,7 +139,7 @@ public class IntegrationTestIngest extends IntegrationTestBase {
|
|||
String familiesString = getConf().get(
|
||||
String.format("%s.%s", clazz, LoadTestTool.OPT_COLUMN_FAMILIES));
|
||||
if (familiesString == null) {
|
||||
for (byte[] family : LoadTestTool.DEFAULT_COLUMN_FAMILIES) {
|
||||
for (byte[] family : HFileTestUtil.DEFAULT_COLUMN_FAMILIES) {
|
||||
families.add(Bytes.toString(family));
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.hadoop.hbase.regionserver.HStore;
|
|||
import org.apache.hadoop.hbase.regionserver.StoreEngine;
|
||||
import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
|
||||
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
||||
import org.apache.hadoop.hbase.util.LoadTestTool;
|
||||
import org.apache.hadoop.hbase.util.HFileTestUtil;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
|
@ -41,7 +41,7 @@ public class IntegrationTestIngestStripeCompactions extends IntegrationTestInges
|
|||
HTableDescriptor htd = new HTableDescriptor(getTablename());
|
||||
htd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
|
||||
htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "100");
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(LoadTestTool.DEFAULT_COLUMN_FAMILY);
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(HFileTestUtil.DEFAULT_COLUMN_FAMILY);
|
||||
HBaseTestingUtility.createPreSplitLoadTestTable(util.getConfiguration(), htd, hcd);
|
||||
}
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.client.Connection;
|
|||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.HFileTestUtil;
|
||||
import org.apache.hadoop.hbase.util.LoadTestDataGeneratorWithMOB;
|
||||
import org.apache.hadoop.hbase.util.LoadTestTool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
@ -44,7 +45,7 @@ import org.junit.experimental.categories.Category;
|
|||
public class IntegrationTestIngestWithMOB extends IntegrationTestIngest {
|
||||
private static final char COLON = ':';
|
||||
|
||||
private byte[] mobColumnFamily = LoadTestTool.DEFAULT_COLUMN_FAMILY;
|
||||
private byte[] mobColumnFamily = HFileTestUtil.DEFAULT_COLUMN_FAMILY;
|
||||
public static final String THRESHOLD = "threshold";
|
||||
public static final String MIN_MOB_DATA_SIZE = "minMobDataSize";
|
||||
public static final String MAX_MOB_DATA_SIZE = "maxMobDataSize";
|
||||
|
@ -56,7 +57,7 @@ public class IntegrationTestIngestWithMOB extends IntegrationTestIngest {
|
|||
//similar to LOAD_TEST_TOOL_INIT_ARGS except OPT_IN_MEMORY is removed
|
||||
protected String[] LOAD_TEST_TOOL_MOB_INIT_ARGS = {
|
||||
LoadTestTool.OPT_COMPRESSION,
|
||||
LoadTestTool.OPT_DATA_BLOCK_ENCODING,
|
||||
HFileTestUtil.OPT_DATA_BLOCK_ENCODING,
|
||||
LoadTestTool.OPT_ENCRYPTION,
|
||||
LoadTestTool.OPT_NUM_REGIONS_PER_SERVER,
|
||||
LoadTestTool.OPT_REGION_REPLICATION,
|
||||
|
|
|
@ -72,6 +72,7 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
|
|||
private static final String PRIMARY_TIMEOUT_DEFAULT = "" + 10 * 1000; // 10 ms
|
||||
private static final String NUM_RS_KEY = "numRs";
|
||||
private static final String NUM_RS_DEFAULT = "" + 3;
|
||||
public static final String FAMILY_NAME = "info";
|
||||
|
||||
/** Extract a descriptive statistic from a {@link com.codahale.metrics.Histogram}. */
|
||||
private enum Stat {
|
||||
|
@ -236,7 +237,7 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
|
|||
|
||||
@Override
|
||||
protected Set<String> getColumnFamilies() {
|
||||
return Sets.newHashSet(Bytes.toString(PerformanceEvaluation.FAMILY_NAME));
|
||||
return Sets.newHashSet(FAMILY_NAME);
|
||||
}
|
||||
|
||||
/** Compute the mean of the given {@code stat} from a timing results. */
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.util.Iterator;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.IntegrationTestBase;
|
|||
import org.apache.hadoop.hbase.IntegrationTestingUtility;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
||||
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
|
@ -55,7 +56,6 @@ import org.apache.hadoop.hbase.client.Scan;
|
|||
import org.apache.hadoop.hbase.client.ScannerCallable;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.mapreduce.NMapInputFormat;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableMapper;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
|
||||
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
|
||||
|
|
|
@ -0,0 +1,316 @@
|
|||
<?xml version="1.0"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<!--
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
-->
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<artifactId>hbase-build-configuration</artifactId>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<version>3.0.0-SNAPSHOT</version>
|
||||
<relativePath>../hbase-build-configuration</relativePath>
|
||||
</parent>
|
||||
<artifactId>hbase-mapreduce</artifactId>
|
||||
<name>Apache HBase - MapReduce</name>
|
||||
<description>
|
||||
This module contains implementations of InputFormat, OutputFormat, Mapper, Reducer, etc which
|
||||
are needed for running MR jobs on tables, WALs, HFiles and other HBase specific constructs.
|
||||
It also contains a bunch of tools: RowCounter, ImportTsv, Import, Export, CompactionTool,
|
||||
ExportSnapshot, WALPlayer, etc
|
||||
</description>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-site-plugin</artifactId>
|
||||
<configuration>
|
||||
<skip>true</skip>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<!--Make it so assembly:single does nothing in here-->
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<configuration>
|
||||
<skipAssembly>true</skipAssembly>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<!-- Testing plugins -->
|
||||
<plugin>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<configuration>
|
||||
<properties>
|
||||
<property>
|
||||
<name>listener</name>
|
||||
<value>org.apache.hadoop.hbase.ServerResourceCheckerJUnitListener</value>
|
||||
</property>
|
||||
</properties>
|
||||
<systemPropertyVariables>
|
||||
<org.apache.hadoop.hbase.shaded.io.netty.packagePrefix>org.apache.hadoop.hbase.shaded.</org.apache.hadoop.hbase.shaded.io.netty.packagePrefix>
|
||||
</systemPropertyVariables>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<!-- Make a jar and put the sources in the jar -->
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-source-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
<!--This plugin's configuration is used to store Eclipse m2e settings
|
||||
only. It has no influence on the Maven build itself.-->
|
||||
<plugin>
|
||||
<groupId>org.eclipse.m2e</groupId>
|
||||
<artifactId>lifecycle-mapping</artifactId>
|
||||
<version>1.0.0</version>
|
||||
<configuration>
|
||||
<lifecycleMappingMetadata>
|
||||
<pluginExecutions>
|
||||
<pluginExecution>
|
||||
<pluginExecutionFilter>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<versionRange>[3.2,)</versionRange>
|
||||
<goals>
|
||||
<goal>compile</goal>
|
||||
</goals>
|
||||
</pluginExecutionFilter>
|
||||
<action>
|
||||
<ignore></ignore>
|
||||
</action>
|
||||
</pluginExecution>
|
||||
</pluginExecutions>
|
||||
</lifecycleMappingMetadata>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
</build>
|
||||
|
||||
<dependencies>
|
||||
<!-- Intra-project dependencies -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-annotations</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>jdk.tools</groupId>
|
||||
<artifactId>jdk.tools</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-annotations</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-common</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-common</artifactId>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-client</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-hadoop-compat</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-hadoop2-compat</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-server</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-server</artifactId>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
<!-- General dependencies -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>log4j</groupId>
|
||||
<artifactId>log4j</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-all</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<profiles>
|
||||
<!-- Skip the tests in this module -->
|
||||
<profile>
|
||||
<id>skipMapReduceTests</id>
|
||||
<activation>
|
||||
<property>
|
||||
<name>skipMapReduceTests</name>
|
||||
</property>
|
||||
</activation>
|
||||
<properties>
|
||||
<surefire.skipFirstPart>true</surefire.skipFirstPart>
|
||||
<surefire.skipSecondPart>true</surefire.skipSecondPart>
|
||||
</properties>
|
||||
</profile>
|
||||
<!-- profile against Hadoop 2.x: This is the default. -->
|
||||
<profile>
|
||||
<id>hadoop-2.0</id>
|
||||
<activation>
|
||||
<property>
|
||||
<!--Below formatting for dev-support/generate-hadoopX-poms.sh-->
|
||||
<!--h2--><name>!hadoop.profile</name>
|
||||
</property>
|
||||
</activation>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.github.stephenc.findbugs</groupId>
|
||||
<artifactId>findbugs-annotations</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>net.java.dev.jets3t</groupId>
|
||||
<artifactId>jets3t</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>javax.servlet.jsp</groupId>
|
||||
<artifactId>jsp-api</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.mortbay.jetty</groupId>
|
||||
<artifactId>jetty</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-server</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-core</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-json</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>servlet-api</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>tomcat</groupId>
|
||||
<artifactId>jasper-compiler</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>tomcat</groupId>
|
||||
<artifactId>jasper-runtime</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.google.code.findbugs</groupId>
|
||||
<artifactId>jsr305</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-minicluster</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</profile>
|
||||
|
||||
<!--
|
||||
profile for building against Hadoop 3.0.x. Activate using:
|
||||
mvn -Dhadoop.profile=3.0
|
||||
-->
|
||||
<profile>
|
||||
<id>hadoop-3.0</id>
|
||||
<activation>
|
||||
<property>
|
||||
<name>hadoop.profile</name>
|
||||
<value>3.0</value>
|
||||
</property>
|
||||
</activation>
|
||||
<properties>
|
||||
<hadoop.version>${hadoop-three.version}</hadoop.version>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-minicluster</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</profile>
|
||||
</profiles>
|
||||
</project>
|
|
@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.Partitioner;
|
||||
|
||||
|
||||
/**
|
||||
* This is used to partition the output keys into groups of keys.
|
||||
* Keys are grouped according to the regions that currently exist
|
|
@ -54,7 +54,7 @@ import java.util.Map;
|
|||
* the overall dataset for the job is defined by the concatenation of the regions and tables
|
||||
* included in each snapshot/scan
|
||||
* pair.
|
||||
* {@link org.apache.hadoop.hbase.mapred.TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map,
|
||||
* {@link TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map,
|
||||
* Class, Class, Class, JobConf, boolean, Path)}
|
||||
* can be used to configure the job.
|
||||
* <pre>{@code
|
|
@ -167,10 +167,10 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
|||
* Calculates the splits that will serve as input for the map tasks.
|
||||
*
|
||||
* Splits are created in number equal to the smallest between numSplits and
|
||||
* the number of {@link org.apache.hadoop.hbase.regionserver.HRegion}s in the table.
|
||||
* If the number of splits is smaller than the number of
|
||||
* the number of {@link org.apache.hadoop.hbase.regionserver.HRegion}s in the table.
|
||||
* If the number of splits is smaller than the number of
|
||||
* {@link org.apache.hadoop.hbase.regionserver.HRegion}s then splits are spanned across
|
||||
* multiple {@link org.apache.hadoop.hbase.regionserver.HRegion}s
|
||||
* multiple {@link org.apache.hadoop.hbase.regionserver.HRegion}s
|
||||
* and are grouped the most evenly possible. In the
|
||||
* case splits are uneven the bigger splits are placed first in the
|
||||
* {@link InputSplit} array.
|
|
@ -102,7 +102,7 @@ public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
|
||||
/**
|
||||
* Creates a new record writer.
|
||||
*
|
||||
*
|
||||
* Be aware that the baseline javadoc gives the impression that there is a single
|
||||
* {@link RecordWriter} per job but in HBase, it is more natural if we give you a new
|
||||
* RecordWriter per call of this method. You must close the returned RecordWriter when done.
|
|
@ -21,6 +21,6 @@ Provides HBase <a href="http://wiki.apache.org/hadoop/HadoopMapReduce">MapReduce
|
|||
Input/OutputFormats, a table indexing MapReduce job, and utility methods.
|
||||
|
||||
<p>See <a href="http://hbase.apache.org/book.html#mapreduce">HBase and MapReduce</a>
|
||||
in the HBase Reference Guide for mapreduce over hbase documentation.
|
||||
in the HBase Reference Guide for mapreduce over hbase documentation.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
|
@ -82,22 +82,22 @@ public class CopyTable extends Configured implements Tool {
|
|||
if (!doCommandLine(args)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
|
||||
job.setJarByClass(CopyTable.class);
|
||||
Scan scan = new Scan();
|
||||
|
||||
|
||||
scan.setBatch(batch);
|
||||
scan.setCacheBlocks(false);
|
||||
|
||||
|
||||
if (cacheRow > 0) {
|
||||
scan.setCaching(cacheRow);
|
||||
} else {
|
||||
scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100));
|
||||
}
|
||||
|
||||
|
||||
scan.setTimeRange(startTime, endTime);
|
||||
|
||||
|
||||
if (allCells) {
|
||||
scan.setRaw(true);
|
||||
}
|
||||
|
@ -259,13 +259,13 @@ public class CopyTable extends Configured implements Tool {
|
|||
endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
final String batchArgKey = "--batch=";
|
||||
if (cmd.startsWith(batchArgKey)) {
|
||||
batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
final String cacheRowArgKey = "--cacheRow=";
|
||||
if (cmd.startsWith(cacheRowArgKey)) {
|
||||
cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length()));
|
|
@ -44,10 +44,10 @@ import org.apache.hadoop.util.Tool;
|
|||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
/**
|
||||
* Export an HBase table.
|
||||
* Writes content to sequence files up in HDFS. Use {@link Import} to read it
|
||||
* back in again.
|
||||
*/
|
||||
* Export an HBase table.
|
||||
* Writes content to sequence files up in HDFS. Use {@link Import} to read it
|
||||
* back in again.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
public class Export extends Configured implements Tool {
|
||||
private static final Log LOG = LogFactory.getLog(Export.class);
|
|
@ -43,7 +43,7 @@ import org.apache.hadoop.mapreduce.Partitioner;
|
|||
* <p>This class is not suitable as partitioner creating hfiles
|
||||
* for incremental bulk loads as region spread will likely change between time of
|
||||
* hfile creation and load time. See {@link LoadIncrementalHFiles}
|
||||
* and <a href="http://hbase.apache.org/book.html#arch.bulk.load">Bulk Load</a>.
|
||||
* and <a href="http://hbase.apache.org/book.html#arch.bulk.load">Bulk Load</a>.</p>
|
||||
*
|
||||
* @param <KEY> The type of the key.
|
||||
* @param <VALUE> The type of the value.
|
|
@ -64,27 +64,27 @@ import org.apache.hadoop.hbase.shaded.com.google.common.collect.Ordering;
|
|||
public class HashTable extends Configured implements Tool {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(HashTable.class);
|
||||
|
||||
|
||||
private static final int DEFAULT_BATCH_SIZE = 8000;
|
||||
|
||||
|
||||
private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size";
|
||||
final static String PARTITIONS_FILE_NAME = "partitions";
|
||||
final static String MANIFEST_FILE_NAME = "manifest";
|
||||
final static String HASH_DATA_DIR = "hashes";
|
||||
final static String OUTPUT_DATA_FILE_PREFIX = "part-r-";
|
||||
private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp";
|
||||
|
||||
|
||||
TableHash tableHash = new TableHash();
|
||||
Path destPath;
|
||||
|
||||
|
||||
public HashTable(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
|
||||
public static class TableHash {
|
||||
|
||||
|
||||
Path hashDir;
|
||||
|
||||
|
||||
String tableName;
|
||||
String families = null;
|
||||
long batchSize = DEFAULT_BATCH_SIZE;
|
||||
|
@ -95,9 +95,9 @@ public class HashTable extends Configured implements Tool {
|
|||
int versions = -1;
|
||||
long startTime = 0;
|
||||
long endTime = 0;
|
||||
|
||||
|
||||
List<ImmutableBytesWritable> partitions;
|
||||
|
||||
|
||||
public static TableHash read(Configuration conf, Path hashDir) throws IOException {
|
||||
TableHash tableHash = new TableHash();
|
||||
FileSystem fs = hashDir.getFileSystem(conf);
|
||||
|
@ -106,7 +106,7 @@ public class HashTable extends Configured implements Tool {
|
|||
tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME));
|
||||
return tableHash;
|
||||
}
|
||||
|
||||
|
||||
void writePropertiesFile(FileSystem fs, Path path) throws IOException {
|
||||
Properties p = new Properties();
|
||||
p.setProperty("table", tableName);
|
||||
|
@ -133,7 +133,7 @@ public class HashTable extends Configured implements Tool {
|
|||
if (endTime != 0) {
|
||||
p.setProperty("endTimestamp", Long.toString(endTime));
|
||||
}
|
||||
|
||||
|
||||
try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) {
|
||||
p.store(osw, null);
|
||||
}
|
||||
|
@ -150,7 +150,7 @@ public class HashTable extends Configured implements Tool {
|
|||
families = p.getProperty("columnFamilies");
|
||||
batchSize = Long.parseLong(p.getProperty("targetBatchSize"));
|
||||
numHashFiles = Integer.parseInt(p.getProperty("numHashFiles"));
|
||||
|
||||
|
||||
String startRowHex = p.getProperty("startRowHex");
|
||||
if (startRowHex != null) {
|
||||
startRow = Bytes.fromHex(startRowHex);
|
||||
|
@ -159,28 +159,28 @@ public class HashTable extends Configured implements Tool {
|
|||
if (stopRowHex != null) {
|
||||
stopRow = Bytes.fromHex(stopRowHex);
|
||||
}
|
||||
|
||||
|
||||
String scanBatchString = p.getProperty("scanBatch");
|
||||
if (scanBatchString != null) {
|
||||
scanBatch = Integer.parseInt(scanBatchString);
|
||||
}
|
||||
|
||||
|
||||
String versionString = p.getProperty("versions");
|
||||
if (versionString != null) {
|
||||
versions = Integer.parseInt(versionString);
|
||||
}
|
||||
|
||||
|
||||
String startTimeString = p.getProperty("startTimestamp");
|
||||
if (startTimeString != null) {
|
||||
startTime = Long.parseLong(startTimeString);
|
||||
}
|
||||
|
||||
|
||||
String endTimeString = p.getProperty("endTimestamp");
|
||||
if (endTimeString != null) {
|
||||
endTime = Long.parseLong(endTimeString);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Scan initScan() throws IOException {
|
||||
Scan scan = new Scan();
|
||||
scan.setCacheBlocks(false);
|
||||
|
@ -206,7 +206,7 @@ public class HashTable extends Configured implements Tool {
|
|||
}
|
||||
return scan;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Choose partitions between row ranges to hash to a single output file
|
||||
* Selects region boundaries that fall within the scan range, and groups them
|
||||
|
@ -217,7 +217,7 @@ public class HashTable extends Configured implements Tool {
|
|||
for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) {
|
||||
byte[] regionStartKey = regionStartEndKeys.getFirst()[i];
|
||||
byte[] regionEndKey = regionStartEndKeys.getSecond()[i];
|
||||
|
||||
|
||||
// if scan begins after this region, or starts before this region, then drop this region
|
||||
// in other words:
|
||||
// IF (scan begins before the end of this region
|
||||
|
@ -230,7 +230,7 @@ public class HashTable extends Configured implements Tool {
|
|||
startKeys.add(regionStartKey);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int numRegions = startKeys.size();
|
||||
if (numHashFiles == 0) {
|
||||
numHashFiles = numRegions / 100;
|
||||
|
@ -242,7 +242,7 @@ public class HashTable extends Configured implements Tool {
|
|||
// can't partition within regions
|
||||
numHashFiles = numRegions;
|
||||
}
|
||||
|
||||
|
||||
// choose a subset of start keys to group regions into ranges
|
||||
partitions = new ArrayList<>(numHashFiles - 1);
|
||||
// skip the first start key as it is not a partition between ranges.
|
||||
|
@ -251,19 +251,19 @@ public class HashTable extends Configured implements Tool {
|
|||
partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void writePartitionFile(Configuration conf, Path path) throws IOException {
|
||||
FileSystem fs = path.getFileSystem(conf);
|
||||
@SuppressWarnings("deprecation")
|
||||
SequenceFile.Writer writer = SequenceFile.createWriter(
|
||||
fs, conf, path, ImmutableBytesWritable.class, NullWritable.class);
|
||||
|
||||
|
||||
for (int i = 0; i < partitions.size(); i++) {
|
||||
writer.append(partitions.get(i), NullWritable.get());
|
||||
}
|
||||
writer.close();
|
||||
}
|
||||
|
||||
|
||||
private void readPartitionFile(FileSystem fs, Configuration conf, Path path)
|
||||
throws IOException {
|
||||
@SuppressWarnings("deprecation")
|
||||
|
@ -274,7 +274,7 @@ public class HashTable extends Configured implements Tool {
|
|||
partitions.add(new ImmutableBytesWritable(key.copyBytes()));
|
||||
}
|
||||
reader.close();
|
||||
|
||||
|
||||
if (!Ordering.natural().isOrdered(partitions)) {
|
||||
throw new IOException("Partitions are not ordered!");
|
||||
}
|
||||
|
@ -309,30 +309,30 @@ public class HashTable extends Configured implements Tool {
|
|||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
|
||||
static String getDataFileName(int hashFileIndex) {
|
||||
return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Open a TableHash.Reader starting at the first hash at or after the given key.
|
||||
* @throws IOException
|
||||
* @throws IOException
|
||||
*/
|
||||
public Reader newReader(Configuration conf, ImmutableBytesWritable startKey)
|
||||
throws IOException {
|
||||
return new Reader(conf, startKey);
|
||||
}
|
||||
|
||||
|
||||
public class Reader implements java.io.Closeable {
|
||||
private final Configuration conf;
|
||||
|
||||
|
||||
private int hashFileIndex;
|
||||
private MapFile.Reader mapFileReader;
|
||||
|
||||
|
||||
private boolean cachedNext;
|
||||
private ImmutableBytesWritable key;
|
||||
private ImmutableBytesWritable hash;
|
||||
|
||||
|
||||
Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException {
|
||||
this.conf = conf;
|
||||
int partitionIndex = Collections.binarySearch(partitions, startKey);
|
||||
|
@ -344,7 +344,7 @@ public class HashTable extends Configured implements Tool {
|
|||
hashFileIndex = -1-partitionIndex;
|
||||
}
|
||||
openHashFile();
|
||||
|
||||
|
||||
// MapFile's don't make it easy to seek() so that the subsequent next() returns
|
||||
// the desired key/value pair. So we cache it for the first call of next().
|
||||
hash = new ImmutableBytesWritable();
|
||||
|
@ -356,7 +356,7 @@ public class HashTable extends Configured implements Tool {
|
|||
cachedNext = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Read the next key/hash pair.
|
||||
* Returns true if such a pair exists and false when at the end of the data.
|
||||
|
@ -384,7 +384,7 @@ public class HashTable extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the current key
|
||||
* @return the current key or null if there is no current key
|
||||
|
@ -392,7 +392,7 @@ public class HashTable extends Configured implements Tool {
|
|||
public ImmutableBytesWritable getCurrentKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the current hash
|
||||
* @return the current hash or null if there is no current hash
|
||||
|
@ -400,7 +400,7 @@ public class HashTable extends Configured implements Tool {
|
|||
public ImmutableBytesWritable getCurrentHash() {
|
||||
return hash;
|
||||
}
|
||||
|
||||
|
||||
private void openHashFile() throws IOException {
|
||||
if (mapFileReader != null) {
|
||||
mapFileReader.close();
|
||||
|
@ -416,19 +416,19 @@ public class HashTable extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static boolean isTableStartRow(byte[] row) {
|
||||
return Bytes.equals(HConstants.EMPTY_START_ROW, row);
|
||||
}
|
||||
|
||||
|
||||
static boolean isTableEndRow(byte[] row) {
|
||||
return Bytes.equals(HConstants.EMPTY_END_ROW, row);
|
||||
}
|
||||
|
||||
|
||||
public Job createSubmittableJob(String[] args) throws IOException {
|
||||
Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME);
|
||||
generatePartitions(partitionsPath);
|
||||
|
||||
|
||||
Job job = Job.getInstance(getConf(),
|
||||
getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
|
||||
Configuration jobConf = job.getConfiguration();
|
||||
|
@ -437,7 +437,7 @@ public class HashTable extends Configured implements Tool {
|
|||
|
||||
TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
|
||||
HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
|
||||
|
||||
|
||||
// use a TotalOrderPartitioner and reducers to group region output into hash files
|
||||
job.setPartitionerClass(TotalOrderPartitioner.class);
|
||||
TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath);
|
||||
|
@ -447,31 +447,31 @@ public class HashTable extends Configured implements Tool {
|
|||
job.setOutputValueClass(ImmutableBytesWritable.class);
|
||||
job.setOutputFormatClass(MapFileOutputFormat.class);
|
||||
FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));
|
||||
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
|
||||
private void generatePartitions(Path partitionsPath) throws IOException {
|
||||
Connection connection = ConnectionFactory.createConnection(getConf());
|
||||
Pair<byte[][], byte[][]> regionKeys
|
||||
= connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys();
|
||||
connection.close();
|
||||
|
||||
|
||||
tableHash.selectPartitions(regionKeys);
|
||||
LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath);
|
||||
|
||||
|
||||
tableHash.writePartitionFile(getConf(), partitionsPath);
|
||||
}
|
||||
|
||||
|
||||
static class ResultHasher {
|
||||
private MessageDigest digest;
|
||||
|
||||
|
||||
private boolean batchStarted = false;
|
||||
private ImmutableBytesWritable batchStartKey;
|
||||
private ImmutableBytesWritable batchHash;
|
||||
private long batchSize = 0;
|
||||
|
||||
|
||||
|
||||
|
||||
public ResultHasher() {
|
||||
try {
|
||||
digest = MessageDigest.getInstance("MD5");
|
||||
|
@ -479,7 +479,7 @@ public class HashTable extends Configured implements Tool {
|
|||
Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void startBatch(ImmutableBytesWritable row) {
|
||||
if (batchStarted) {
|
||||
throw new RuntimeException("Cannot start new batch without finishing existing one.");
|
||||
|
@ -489,7 +489,7 @@ public class HashTable extends Configured implements Tool {
|
|||
batchStartKey = row;
|
||||
batchHash = null;
|
||||
}
|
||||
|
||||
|
||||
public void hashResult(Result result) {
|
||||
if (!batchStarted) {
|
||||
throw new RuntimeException("Cannot add to batch that has not been started.");
|
||||
|
@ -508,11 +508,11 @@ public class HashTable extends Configured implements Tool {
|
|||
ts >>>= 8;
|
||||
}
|
||||
digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength);
|
||||
|
||||
|
||||
batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void finishBatch() {
|
||||
if (!batchStarted) {
|
||||
throw new RuntimeException("Cannot finish batch that has not started.");
|
||||
|
@ -537,39 +537,39 @@ public class HashTable extends Configured implements Tool {
|
|||
return batchSize;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static class HashMapper
|
||||
extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
|
||||
|
||||
|
||||
private ResultHasher hasher;
|
||||
private long targetBatchSize;
|
||||
|
||||
|
||||
private ImmutableBytesWritable currentRow;
|
||||
|
||||
|
||||
@Override
|
||||
protected void setup(Context context) throws IOException, InterruptedException {
|
||||
targetBatchSize = context.getConfiguration()
|
||||
.getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
|
||||
hasher = new ResultHasher();
|
||||
|
||||
|
||||
TableSplit split = (TableSplit) context.getInputSplit();
|
||||
hasher.startBatch(new ImmutableBytesWritable(split.getStartRow()));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void map(ImmutableBytesWritable key, Result value, Context context)
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
|
||||
if (currentRow == null || !currentRow.equals(key)) {
|
||||
currentRow = new ImmutableBytesWritable(key); // not immutable
|
||||
|
||||
|
||||
if (hasher.getBatchSize() >= targetBatchSize) {
|
||||
hasher.finishBatch();
|
||||
context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
|
||||
hasher.startBatch(currentRow);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
hasher.hashResult(value);
|
||||
}
|
||||
|
||||
|
@ -579,20 +579,20 @@ public class HashTable extends Configured implements Tool {
|
|||
context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void writeTempManifestFile() throws IOException {
|
||||
Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
|
||||
FileSystem fs = tempManifestPath.getFileSystem(getConf());
|
||||
tableHash.writePropertiesFile(fs, tempManifestPath);
|
||||
}
|
||||
|
||||
|
||||
private void completeManifest() throws IOException {
|
||||
Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
|
||||
Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME);
|
||||
FileSystem fs = tempManifestPath.getFileSystem(getConf());
|
||||
fs.rename(tempManifestPath, manifestPath);
|
||||
}
|
||||
|
||||
|
||||
private static final int NUM_ARGS = 2;
|
||||
private static void printUsage(final String errorMsg) {
|
||||
if (errorMsg != null && errorMsg.length() > 0) {
|
||||
|
@ -636,41 +636,41 @@ public class HashTable extends Configured implements Tool {
|
|||
return false;
|
||||
}
|
||||
try {
|
||||
|
||||
|
||||
tableHash.tableName = args[args.length-2];
|
||||
destPath = new Path(args[args.length-1]);
|
||||
|
||||
|
||||
for (int i = 0; i < args.length - NUM_ARGS; i++) {
|
||||
String cmd = args[i];
|
||||
if (cmd.equals("-h") || cmd.startsWith("--h")) {
|
||||
printUsage(null);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
final String batchSizeArgKey = "--batchsize=";
|
||||
if (cmd.startsWith(batchSizeArgKey)) {
|
||||
tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length()));
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
final String numHashFilesArgKey = "--numhashfiles=";
|
||||
if (cmd.startsWith(numHashFilesArgKey)) {
|
||||
tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length()));
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
final String startRowArgKey = "--startrow=";
|
||||
if (cmd.startsWith(startRowArgKey)) {
|
||||
tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length()));
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
final String stopRowArgKey = "--stoprow=";
|
||||
if (cmd.startsWith(stopRowArgKey)) {
|
||||
tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length()));
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
final String startTimeArgKey = "--starttime=";
|
||||
if (cmd.startsWith(startTimeArgKey)) {
|
||||
tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
|
||||
|
@ -710,7 +710,7 @@ public class HashTable extends Configured implements Tool {
|
|||
+ tableHash.startTime + " >= endtime=" + tableHash.endTime);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
printUsage("Can't start because " + e.getMessage());
|
|
@ -57,8 +57,8 @@ extends TableReducer<Writable, Mutation, Writable> {
|
|||
|
||||
/**
|
||||
* Writes each given record, consisting of the row key and the given values,
|
||||
* to the configured {@link org.apache.hadoop.mapreduce.OutputFormat}.
|
||||
* It is emitting the row key and each {@link org.apache.hadoop.hbase.client.Put Put}
|
||||
* to the configured {@link org.apache.hadoop.mapreduce.OutputFormat}.
|
||||
* It is emitting the row key and each {@link org.apache.hadoop.hbase.client.Put Put}
|
||||
* or {@link org.apache.hadoop.hbase.client.Delete Delete} as separate pairs.
|
||||
*
|
||||
* @param key The current row key.
|
|
@ -96,7 +96,7 @@ public class Import extends Configured implements Tool {
|
|||
|
||||
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
|
||||
|
||||
public static class KeyValueWritableComparablePartitioner
|
||||
public static class KeyValueWritableComparablePartitioner
|
||||
extends Partitioner<KeyValueWritableComparable, KeyValue> {
|
||||
private static KeyValueWritableComparable[] START_KEYS = null;
|
||||
@Override
|
||||
|
@ -109,27 +109,27 @@ public class Import extends Configured implements Tool {
|
|||
}
|
||||
return START_KEYS.length;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
public static class KeyValueWritableComparable
|
||||
|
||||
public static class KeyValueWritableComparable
|
||||
implements WritableComparable<KeyValueWritableComparable> {
|
||||
|
||||
private KeyValue kv = null;
|
||||
|
||||
static {
|
||||
|
||||
static {
|
||||
// register this comparator
|
||||
WritableComparator.define(KeyValueWritableComparable.class,
|
||||
WritableComparator.define(KeyValueWritableComparable.class,
|
||||
new KeyValueWritableComparator());
|
||||
}
|
||||
|
||||
|
||||
public KeyValueWritableComparable() {
|
||||
}
|
||||
|
||||
|
||||
public KeyValueWritableComparable(KeyValue kv) {
|
||||
this.kv = kv;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
KeyValue.write(kv, out);
|
||||
|
@ -146,7 +146,7 @@ public class Import extends Configured implements Tool {
|
|||
public int compareTo(KeyValueWritableComparable o) {
|
||||
return CellComparator.COMPARATOR.compare(this.kv, ((KeyValueWritableComparable)o).kv);
|
||||
}
|
||||
|
||||
|
||||
public static class KeyValueWritableComparator extends WritableComparator {
|
||||
|
||||
@Override
|
||||
|
@ -159,13 +159,13 @@ public class Import extends Configured implements Tool {
|
|||
return compare(kv1, kv2);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
public static class KeyValueReducer
|
||||
extends
|
||||
Reducer<KeyValueWritableComparable, KeyValue, ImmutableBytesWritable, KeyValue> {
|
||||
|
@ -180,12 +180,12 @@ public class Import extends Configured implements Tool {
|
|||
context.write(new ImmutableBytesWritable(kv.getRowArray()), kv);
|
||||
if (++index % 100 == 0)
|
||||
context.setStatus("Wrote " + index + " KeyValues, "
|
||||
+ "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray()));
|
||||
+ "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class KeyValueSortImporter
|
||||
|
||||
public static class KeyValueSortImporter
|
||||
extends TableMapper<KeyValueWritableComparable, KeyValue> {
|
||||
private Map<byte[], byte[]> cfRenameMap;
|
||||
private Filter filter;
|
||||
|
@ -215,16 +215,16 @@ public class Import extends Configured implements Tool {
|
|||
if (kv == null) continue;
|
||||
// TODO get rid of ensureKeyValue
|
||||
KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap));
|
||||
context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret);
|
||||
context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void setup(Context context) throws IOException {
|
||||
public void setup(Context context) throws IOException {
|
||||
cfRenameMap = createCfRenameMap(context.getConfiguration());
|
||||
filter = instantiateFilter(context.getConfiguration());
|
||||
int reduceNum = context.getNumReduceTasks();
|
||||
|
@ -236,17 +236,17 @@ public class Import extends Configured implements Tool {
|
|||
if (startKeys.length != reduceNum) {
|
||||
throw new IOException("Region split after job initialization");
|
||||
}
|
||||
KeyValueWritableComparable[] startKeyWraps =
|
||||
KeyValueWritableComparable[] startKeyWraps =
|
||||
new KeyValueWritableComparable[startKeys.length - 1];
|
||||
for (int i = 1; i < startKeys.length; ++i) {
|
||||
startKeyWraps[i - 1] =
|
||||
startKeyWraps[i - 1] =
|
||||
new KeyValueWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i]));
|
||||
}
|
||||
KeyValueWritableComparablePartitioner.START_KEYS = startKeyWraps;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A mapper that just writes out KeyValues.
|
||||
*/
|
||||
|
@ -438,7 +438,7 @@ public class Import extends Configured implements Tool {
|
|||
* @throws IllegalArgumentException if the filter is misconfigured
|
||||
*/
|
||||
public static Filter instantiateFilter(Configuration conf) {
|
||||
// get the filter, if it was configured
|
||||
// get the filter, if it was configured
|
||||
Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
|
||||
if (filterClass == null) {
|
||||
LOG.debug("No configured filter class, accepting all keyvalues.");
|
||||
|
@ -506,18 +506,18 @@ public class Import extends Configured implements Tool {
|
|||
// If there's a rename mapping for this CF, create a new KeyValue
|
||||
byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
|
||||
if(newCfName != null) {
|
||||
kv = new KeyValue(kv.getRowArray(), // row buffer
|
||||
kv = new KeyValue(kv.getRowArray(), // row buffer
|
||||
kv.getRowOffset(), // row offset
|
||||
kv.getRowLength(), // row length
|
||||
newCfName, // CF buffer
|
||||
0, // CF offset
|
||||
newCfName.length, // CF length
|
||||
0, // CF offset
|
||||
newCfName.length, // CF length
|
||||
kv.getQualifierArray(), // qualifier buffer
|
||||
kv.getQualifierOffset(), // qualifier offset
|
||||
kv.getQualifierLength(), // qualifier length
|
||||
kv.getTimestamp(), // timestamp
|
||||
KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
|
||||
kv.getValueArray(), // value buffer
|
||||
kv.getValueArray(), // value buffer
|
||||
kv.getValueOffset(), // value offset
|
||||
kv.getValueLength()); // value length
|
||||
}
|
||||
|
@ -549,26 +549,26 @@ public class Import extends Configured implements Tool {
|
|||
/**
|
||||
* <p>Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells
|
||||
* the mapper how to rename column families.
|
||||
*
|
||||
* <p>Alternately, instead of calling this function, you could set the configuration key
|
||||
* {@link #CF_RENAME_PROP} yourself. The value should look like
|
||||
*
|
||||
* <p>Alternately, instead of calling this function, you could set the configuration key
|
||||
* {@link #CF_RENAME_PROP} yourself. The value should look like
|
||||
* <pre>srcCf1:destCf1,srcCf2:destCf2,....</pre>. This would have the same effect on
|
||||
* the mapper behavior.
|
||||
*
|
||||
*
|
||||
* @param conf the Configuration in which the {@link #CF_RENAME_PROP} key will be
|
||||
* set
|
||||
* @param renameMap a mapping from source CF names to destination CF names
|
||||
*/
|
||||
static public void configureCfRenaming(Configuration conf,
|
||||
static public void configureCfRenaming(Configuration conf,
|
||||
Map<String, String> renameMap) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for(Map.Entry<String,String> entry: renameMap.entrySet()) {
|
||||
String sourceCf = entry.getKey();
|
||||
String destCf = entry.getValue();
|
||||
|
||||
if(sourceCf.contains(":") || sourceCf.contains(",") ||
|
||||
if(sourceCf.contains(":") || sourceCf.contains(",") ||
|
||||
destCf.contains(":") || destCf.contains(",")) {
|
||||
throw new IllegalArgumentException("Illegal character in CF names: "
|
||||
throw new IllegalArgumentException("Illegal character in CF names: "
|
||||
+ sourceCf + ", " + destCf);
|
||||
}
|
||||
|
||||
|
@ -632,10 +632,10 @@ public class Import extends Configured implements Tool {
|
|||
FileOutputFormat.setOutputPath(job, outputDir);
|
||||
job.setMapOutputKeyClass(KeyValueWritableComparable.class);
|
||||
job.setMapOutputValueClass(KeyValue.class);
|
||||
job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class",
|
||||
job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class",
|
||||
KeyValueWritableComparable.KeyValueWritableComparator.class,
|
||||
RawComparator.class);
|
||||
Path partitionsPath =
|
||||
Path partitionsPath =
|
||||
new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration()));
|
||||
FileSystem fs = FileSystem.get(job.getConfiguration());
|
||||
fs.deleteOnExit(partitionsPath);
|
||||
|
@ -647,7 +647,7 @@ public class Import extends Configured implements Tool {
|
|||
} else if (hfileOutPath != null) {
|
||||
LOG.info("writing to hfiles for bulk load.");
|
||||
job.setMapperClass(KeyValueImporter.class);
|
||||
try (Connection conn = ConnectionFactory.createConnection(conf);
|
||||
try (Connection conn = ConnectionFactory.createConnection(conf);
|
||||
Table table = conn.getTable(tableName);
|
||||
RegionLocator regionLocator = conn.getRegionLocator(tableName)){
|
||||
job.setReducerClass(KeyValueSortReducer.class);
|
|
@ -34,9 +34,10 @@ import org.apache.hadoop.mapreduce.Reducer;
|
|||
* @see HFileOutputFormat2
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
public class KeyValueSortReducer extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
|
||||
protected void reduce(ImmutableBytesWritable row, java.lang.Iterable<KeyValue> kvs,
|
||||
org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
|
||||
public class KeyValueSortReducer
|
||||
extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
|
||||
protected void reduce(ImmutableBytesWritable row, Iterable<KeyValue> kvs,
|
||||
Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
|
||||
throws java.io.IOException, InterruptedException {
|
||||
TreeSet<KeyValue> map = new TreeSet<>(CellComparator.COMPARATOR);
|
||||
for (KeyValue kv: kvs) {
|
|
@ -100,7 +100,7 @@ public class MultiTableHFileOutputFormat extends HFileOutputFormat2 {
|
|||
|
||||
final private static int validateCompositeKey(byte[] keyBytes) {
|
||||
|
||||
int separatorIdx = Bytes.indexOf(keyBytes, HFileOutputFormat2.tableSeparator);
|
||||
int separatorIdx = Bytes.indexOf(keyBytes, tableSeparator);
|
||||
|
||||
// Either the separator was not found or a tablename wasn't present or a key wasn't present
|
||||
if (separatorIdx == -1) {
|
|
@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.client.Scan;
|
||||
|
||||
/**
|
||||
* Convert HBase tabular data from multiple scanners into a format that
|
||||
* Convert HBase tabular data from multiple scanners into a format that
|
||||
* is consumable by Map/Reduce.
|
||||
*
|
||||
* <p>
|
||||
|
@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.client.Scan;
|
|||
*
|
||||
* <pre>
|
||||
* List<Scan> scans = new ArrayList<Scan>();
|
||||
*
|
||||
*
|
||||
* Scan scan1 = new Scan();
|
||||
* scan1.setStartRow(firstRow1);
|
||||
* scan1.setStopRow(lastRow1);
|
|
@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.client.Table;
|
|||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.RegionSizeCalculator;
|
||||
import org.apache.hadoop.mapreduce.InputFormat;
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
|
@ -33,13 +33,13 @@ import java.util.Map;
|
|||
|
||||
/**
|
||||
* MultiTableSnapshotInputFormat generalizes
|
||||
* {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
|
||||
* {@link TableSnapshotInputFormat}
|
||||
* allowing a MapReduce job to run over one or more table snapshots, with one or more scans
|
||||
* configured for each.
|
||||
* Internally, the input format delegates to
|
||||
* {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
|
||||
* {@link TableSnapshotInputFormat}
|
||||
* and thus has the same performance advantages;
|
||||
* see {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for
|
||||
* see {@link TableSnapshotInputFormat} for
|
||||
* more details.
|
||||
* Usage is similar to TableSnapshotInputFormat, with the following exception:
|
||||
* initMultiTableSnapshotMapperJob takes in a map
|
||||
|
@ -48,7 +48,7 @@ import java.util.Map;
|
|||
* the overall dataset for the job is defined by the concatenation of the regions and tables
|
||||
* included in each snapshot/scan
|
||||
* pair.
|
||||
* {@link org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#initMultiTableSnapshotMapperJob
|
||||
* {@link TableMapReduceUtil#initMultiTableSnapshotMapperJob
|
||||
* (java.util.Map, Class, Class, Class, org.apache.hadoop.mapreduce.Job, boolean, org.apache
|
||||
* .hadoop.fs.Path)}
|
||||
* can be used to configure the job.
|
||||
|
@ -69,11 +69,11 @@ import java.util.Map;
|
|||
* record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce
|
||||
* .TableSnapshotInputFormat}
|
||||
* (one per region).
|
||||
* See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on
|
||||
* See {@link TableSnapshotInputFormat} for more notes on
|
||||
* permissioning; the
|
||||
* same caveats apply here.
|
||||
*
|
||||
* @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
|
||||
* @see TableSnapshotInputFormat
|
||||
* @see org.apache.hadoop.hbase.client.TableSnapshotScanner
|
||||
*/
|
||||
@InterfaceAudience.Public
|
|
@ -253,7 +253,7 @@ public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> {
|
|||
c.setAccessible(true);
|
||||
subcontext = (Context) c.newInstance(
|
||||
mapper,
|
||||
outer.getConfiguration(),
|
||||
outer.getConfiguration(),
|
||||
outer.getTaskAttemptID(),
|
||||
new SubMapRecordReader(),
|
||||
new SubMapRecordWriter(),
|
||||
|
@ -272,7 +272,7 @@ public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> {
|
|||
InputSplit.class);
|
||||
c.setAccessible(true);
|
||||
MapContext mc = (MapContext) c.newInstance(
|
||||
outer.getConfiguration(),
|
||||
outer.getConfiguration(),
|
||||
outer.getTaskAttemptID(),
|
||||
new SubMapRecordReader(),
|
||||
new SubMapRecordWriter(),
|
|
@ -67,7 +67,7 @@ public class MutationSerialization implements Serialization<Mutation> {
|
|||
public void open(InputStream in) throws IOException {
|
||||
this.in = in;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
private static class MutationSerializer implements Serializer<Mutation> {
|
||||
private OutputStream out;
|
|
@ -15,12 +15,11 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
@ -33,18 +32,14 @@ import org.apache.hadoop.hbase.HRegionLocation;
|
|||
import org.apache.hadoop.hbase.RegionLoad;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
* Computes size of each region for given table and given column families.
|
||||
* The value is used by MapReduce for better scheduling.
|
||||
* */
|
||||
@InterfaceStability.Evolving
|
||||
@InterfaceAudience.Private
|
||||
public class RegionSizeCalculator {
|
||||
|
||||
|
@ -58,20 +53,6 @@ public class RegionSizeCalculator {
|
|||
static final String ENABLE_REGIONSIZECALCULATOR = "hbase.regionsizecalculator.enable";
|
||||
private static final long MEGABYTE = 1024L * 1024L;
|
||||
|
||||
/**
|
||||
* Computes size of each region for table and given column families.
|
||||
*
|
||||
* @deprecated Use {@link #RegionSizeCalculator(RegionLocator, Admin)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public RegionSizeCalculator(Table table) throws IOException {
|
||||
try (Connection conn = ConnectionFactory.createConnection(table.getConfiguration());
|
||||
RegionLocator locator = conn.getRegionLocator(table.getName());
|
||||
Admin admin = conn.getAdmin()) {
|
||||
init(locator, admin);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes size of each region for table and given column families.
|
||||
* */
|
|
@ -52,10 +52,10 @@ implements Configurable {
|
|||
public static final String START = "hbase.simpletotalorder.start";
|
||||
@Deprecated
|
||||
public static final String END = "hbase.simpletotalorder.end";
|
||||
|
||||
|
||||
static final String START_BASE64 = "hbase.simpletotalorder.start.base64";
|
||||
static final String END_BASE64 = "hbase.simpletotalorder.end.base64";
|
||||
|
||||
|
||||
private Configuration c;
|
||||
private byte [] startkey;
|
||||
private byte [] endkey;
|
||||
|
@ -65,21 +65,21 @@ implements Configurable {
|
|||
public static void setStartKey(Configuration conf, byte[] startKey) {
|
||||
conf.set(START_BASE64, Base64.encodeBytes(startKey));
|
||||
}
|
||||
|
||||
|
||||
public static void setEndKey(Configuration conf, byte[] endKey) {
|
||||
conf.set(END_BASE64, Base64.encodeBytes(endKey));
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
static byte[] getStartKey(Configuration conf) {
|
||||
return getKeyFromConf(conf, START_BASE64, START);
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
static byte[] getEndKey(Configuration conf) {
|
||||
return getKeyFromConf(conf, END_BASE64, END);
|
||||
}
|
||||
|
||||
|
||||
private static byte[] getKeyFromConf(Configuration conf,
|
||||
String base64Key, String deprecatedKey) {
|
||||
String encoded = conf.get(base64Key);
|
||||
|
@ -94,7 +94,7 @@ implements Configurable {
|
|||
" - please use static accessor methods instead.");
|
||||
return Bytes.toBytesBinary(oldStyleVal);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int getPartition(final ImmutableBytesWritable key, final VALUE value,
|
||||
final int reduces) {
|
|
@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
|||
import org.apache.hadoop.hbase.util.Addressing;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.RegionSizeCalculator;
|
||||
import org.apache.hadoop.hbase.util.Strings;
|
||||
import org.apache.hadoop.mapreduce.InputFormat;
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
|
@ -133,7 +132,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
/** The underlying {@link Connection} of the table. */
|
||||
private Connection connection;
|
||||
|
||||
|
||||
|
||||
/** The reverse DNS lookup cache mapping: IPAddress => HostName */
|
||||
private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<>();
|
||||
|
||||
|
@ -248,9 +247,9 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
try {
|
||||
RegionSizeCalculator sizeCalculator =
|
||||
new RegionSizeCalculator(getRegionLocator(), getAdmin());
|
||||
|
||||
|
||||
TableName tableName = getTable().getName();
|
||||
|
||||
|
||||
Pair<byte[][], byte[][]> keys = getStartEndKeys();
|
||||
if (keys == null || keys.getFirst() == null ||
|
||||
keys.getFirst().length == 0) {
|
||||
|
@ -544,7 +543,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
}
|
||||
return regionLocator;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Allows subclasses to get the {@link Table}.
|
||||
*/
|
||||
|
@ -569,8 +568,8 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
* Allows subclasses to initialize the table information.
|
||||
*
|
||||
* @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close.
|
||||
* @param tableName The {@link TableName} of the table to process.
|
||||
* @throws IOException
|
||||
* @param tableName The {@link TableName} of the table to process.
|
||||
* @throws IOException
|
||||
*/
|
||||
protected void initializeTable(Connection connection, TableName tableName) throws IOException {
|
||||
if (this.table != null || this.connection != null) {
|
||||
|
@ -611,7 +610,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
protected void setTableRecordReader(TableRecordReader tableRecordReader) {
|
||||
this.tableRecordReader = tableRecordReader;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Handle subclass specific set up.
|
||||
* Each of the entry points used by the MapReduce framework,
|
|
@ -98,8 +98,8 @@ implements Configurable {
|
|||
private BufferedMutator mutator;
|
||||
|
||||
/**
|
||||
* @throws IOException
|
||||
*
|
||||
* @throws IOException
|
||||
*
|
||||
*/
|
||||
public TableRecordWriter() throws IOException {
|
||||
String tableName = conf.get(OUTPUT_TABLE);
|
||||
|
@ -147,7 +147,7 @@ implements Configurable {
|
|||
|
||||
/**
|
||||
* Creates a new record writer.
|
||||
*
|
||||
*
|
||||
* Be aware that the baseline javadoc gives the impression that there is a single
|
||||
* {@link RecordWriter} per job but in HBase, it is more natural if we give you a new
|
||||
* RecordWriter per call of this method. You must close the returned RecordWriter when done.
|
|
@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
|
|||
@InterfaceAudience.Public
|
||||
public class TableRecordReaderImpl {
|
||||
public static final String LOG_PER_ROW_COUNT
|
||||
= "hbase.mapreduce.log.scanner.rowcount";
|
||||
= "hbase.mapreduce.log.scanner.rowcount";
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
|
||||
|
|
@ -48,13 +48,12 @@ import java.util.List;
|
|||
* wals, etc) directly to provide maximum performance. The snapshot is not required to be
|
||||
* restored to the live cluster or cloned. This also allows to run the mapreduce job from an
|
||||
* online or offline hbase cluster. The snapshot files can be exported by using the
|
||||
* {@link org.apache.hadoop.hbase.snapshot.ExportSnapshot} tool, to a pure-hdfs cluster,
|
||||
* and this InputFormat can be used to run the mapreduce job directly over the snapshot files.
|
||||
* {@link org.apache.hadoop.hbase.snapshot.ExportSnapshot} tool, to a pure-hdfs cluster,
|
||||
* and this InputFormat can be used to run the mapreduce job directly over the snapshot files.
|
||||
* The snapshot should not be deleted while there are jobs reading from snapshot files.
|
||||
* <p>
|
||||
* Usage is similar to TableInputFormat, and
|
||||
* {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job,
|
||||
* boolean, Path)}
|
||||
* {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, boolean, Path)}
|
||||
* can be used to configure the job.
|
||||
* <pre>{@code
|
||||
* Job job = new Job(conf);
|
||||
|
@ -67,7 +66,7 @@ import java.util.List;
|
|||
* <p>
|
||||
* Internally, this input format restores the snapshot into the given tmp directory. Similar to
|
||||
* {@link TableInputFormat} an InputSplit is created per region. The region is opened for reading
|
||||
* from each RecordReader. An internal RegionScanner is used to execute the
|
||||
* from each RecordReader. An internal RegionScanner is used to execute the
|
||||
* {@link org.apache.hadoop.hbase.CellScanner} obtained from the user.
|
||||
* <p>
|
||||
* HBase owns all the data and snapshot files on the filesystem. Only the 'hbase' user can read from
|
|
@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
|||
import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
|
||||
import org.apache.hadoop.hbase.client.IsolationLevel;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
|
@ -60,7 +59,6 @@ import java.util.UUID;
|
|||
* Hadoop MR API-agnostic implementation for mapreduce over table snapshots.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class TableSnapshotInputFormatImpl {
|
||||
// TODO: Snapshots files are owned in fs by the hbase user. There is no
|
||||
// easy way to delegate access.
|
|
@ -51,7 +51,7 @@ import org.apache.hadoop.util.StringUtils;
|
|||
@InterfaceAudience.Public
|
||||
public class TextSortReducer extends
|
||||
Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> {
|
||||
|
||||
|
||||
/** Timestamp for all inserted rows */
|
||||
private long ts;
|
||||
|
||||
|
@ -60,7 +60,7 @@ public class TextSortReducer extends
|
|||
|
||||
/** Should skip bad lines */
|
||||
private boolean skipBadLines;
|
||||
|
||||
|
||||
private Counter badLineCount;
|
||||
|
||||
private ImportTsv.TsvParser parser;
|
||||
|
@ -130,7 +130,7 @@ public class TextSortReducer extends
|
|||
skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
|
||||
badLineCount = context.getCounter("ImportTsv", "Bad Lines");
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void reduce(
|
||||
ImmutableBytesWritable rowKey,
|
||||
|
@ -156,7 +156,7 @@ public class TextSortReducer extends
|
|||
ts = parsed.getTimestamp(ts);
|
||||
cellVisibilityExpr = parsed.getCellVisibility();
|
||||
ttl = parsed.getCellTTL();
|
||||
|
||||
|
||||
// create tags for the parsed line
|
||||
List<Tag> tags = new ArrayList<>();
|
||||
if (cellVisibilityExpr != null) {
|
|
@ -75,7 +75,7 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
|
|||
|
||||
/** List of cell tags */
|
||||
private List<Tag> tags;
|
||||
|
||||
|
||||
public long getTs() {
|
||||
return ts;
|
||||
}
|
||||
|
@ -180,7 +180,7 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
|
|||
for (int i = 0; i < parsed.getColumnCount(); i++) {
|
||||
if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
|
||||
|| i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
|
||||
|| i == parser.getCellTTLColumnIndex() || (skipEmptyColumns
|
||||
|| i == parser.getCellTTLColumnIndex() || (skipEmptyColumns
|
||||
&& parsed.getColumnLength(i) == 0)) {
|
||||
continue;
|
||||
}
|
|
@ -118,11 +118,11 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text>
|
|||
if (skipBadLines) {
|
||||
incrementBadLineCount(1);
|
||||
return;
|
||||
}
|
||||
}
|
||||
throw new IOException(badLine);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,6 +21,6 @@ Provides HBase <a href="http://wiki.apache.org/hadoop/HadoopMapReduce">MapReduce
|
|||
Input/OutputFormats, a table indexing MapReduce job, and utility methods.
|
||||
|
||||
<p>See <a href="http://hbase.apache.org/book.html#mapreduce">HBase and MapReduce</a>
|
||||
in the HBase Reference Guide for mapreduce over hbase documentation.
|
||||
in the HBase Reference Guide for mapreduce over hbase documentation.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
|
@ -45,8 +45,8 @@ import org.apache.hadoop.hbase.filter.PrefixFilter;
|
|||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableMapper;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableMapper;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableSplit;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
|
@ -514,7 +514,7 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
final String batchArgKey = "--batch=";
|
||||
if (cmd.startsWith(batchArgKey)) {
|
||||
batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
|
||||
|
@ -683,7 +683,7 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
Job job = createSubmittableJob(conf, args);
|
||||
if (job != null) {
|
||||
return job.waitForCompletion(true) ? 0 : 1;
|
||||
}
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
|
@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.io.hfile.RandomDistribution;
|
|||
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
|
||||
import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
|
||||
import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
|
||||
import org.apache.hadoop.hbase.trace.SpanReceiverHost;
|
||||
import org.apache.hadoop.hbase.util.*;
|
|
@ -36,6 +36,6 @@ public class TestDriver {
|
|||
ProgramDriver programDriverMock = mock(ProgramDriver.class);
|
||||
Driver.setProgramDriver(programDriverMock);
|
||||
Driver.main(new String[]{});
|
||||
verify(programDriverMock).driver(Mockito.any(String[].class));
|
||||
verify(programDriverMock).driver(Mockito.any(String[].class));
|
||||
}
|
||||
}
|
|
@ -65,7 +65,7 @@ public class TestGroupingTableMap {
|
|||
cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
|
||||
JobConf jobConf = new JobConf(cfg);
|
||||
gTableMap.configure(jobConf);
|
||||
|
||||
|
||||
byte[] row = {};
|
||||
List<Cell> keyValues = ImmutableList.<Cell>of(
|
||||
new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("1111")),
|
||||
|
@ -79,7 +79,7 @@ public class TestGroupingTableMap {
|
|||
verifyZeroInteractions(outputCollectorMock);
|
||||
} finally {
|
||||
if (gTableMap != null)
|
||||
gTableMap.close();
|
||||
gTableMap.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -95,7 +95,7 @@ public class TestGroupingTableMap {
|
|||
cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
|
||||
JobConf jobConf = new JobConf(cfg);
|
||||
gTableMap.configure(jobConf);
|
||||
|
||||
|
||||
byte[] row = {};
|
||||
List<Cell> keyValues = ImmutableList.<Cell>of(
|
||||
new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("1111")),
|
||||
|
@ -118,7 +118,7 @@ public class TestGroupingTableMap {
|
|||
@Test
|
||||
@SuppressWarnings({ "deprecation" })
|
||||
public void shouldCreateNewKey() throws Exception {
|
||||
GroupingTableMap gTableMap = null;
|
||||
GroupingTableMap gTableMap = null;
|
||||
try {
|
||||
Result result = mock(Result.class);
|
||||
Reporter reporter = mock(Reporter.class);
|
||||
|
@ -128,7 +128,7 @@ public class TestGroupingTableMap {
|
|||
cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
|
||||
JobConf jobConf = new JobConf(cfg);
|
||||
gTableMap.configure(jobConf);
|
||||
|
||||
|
||||
final byte[] firstPartKeyValue = Bytes.toBytes("34879512738945");
|
||||
final byte[] secondPartKeyValue = Bytes.toBytes("35245142671437");
|
||||
byte[] row = {};
|
||||
|
@ -136,7 +136,7 @@ public class TestGroupingTableMap {
|
|||
new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), firstPartKeyValue),
|
||||
new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), secondPartKeyValue));
|
||||
when(result.listCells()).thenReturn(cells);
|
||||
|
||||
|
||||
final AtomicBoolean outputCollected = new AtomicBoolean();
|
||||
OutputCollector<ImmutableBytesWritable, Result> outputCollector =
|
||||
new OutputCollector<ImmutableBytesWritable, Result>() {
|
||||
|
@ -148,11 +148,11 @@ public class TestGroupingTableMap {
|
|||
outputCollected.set(true);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
gTableMap.map(null, result, outputCollector, reporter);
|
||||
verify(result).listCells();
|
||||
Assert.assertTrue("Output not received", outputCollected.get());
|
||||
|
||||
|
||||
final byte[] firstPartValue = Bytes.toBytes("238947928");
|
||||
final byte[] secondPartValue = Bytes.toBytes("4678456942345");
|
||||
byte[][] data = { firstPartValue, secondPartValue };
|
|
@ -49,11 +49,11 @@ public class TestIdentityTableMap {
|
|||
ImmutableBytesWritable bytesWritableMock = mock(ImmutableBytesWritable.class);
|
||||
OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
|
||||
mock(OutputCollector.class);
|
||||
|
||||
|
||||
for (int i = 0; i < recordNumber; i++)
|
||||
identityTableMap.map(bytesWritableMock, resultMock, outputCollectorMock,
|
||||
reporterMock);
|
||||
|
||||
|
||||
verify(outputCollectorMock, times(recordNumber)).collect(
|
||||
Mockito.any(ImmutableBytesWritable.class), Mockito.any(Result.class));
|
||||
} finally {
|
|
@ -71,7 +71,6 @@ import org.mockito.stubbing.Answer;
|
|||
|
||||
/**
|
||||
* This tests the TableInputFormat and its recovery semantics
|
||||
*
|
||||
*/
|
||||
@Category({MapReduceTests.class, LargeTests.class})
|
||||
public class TestTableInputFormat {
|
||||
|
@ -103,7 +102,7 @@ public class TestTableInputFormat {
|
|||
|
||||
/**
|
||||
* Setup a table with two rows and values.
|
||||
*
|
||||
*
|
||||
* @param tableName
|
||||
* @return
|
||||
* @throws IOException
|
||||
|
@ -114,7 +113,7 @@ public class TestTableInputFormat {
|
|||
|
||||
/**
|
||||
* Setup a table with two rows and values per column family.
|
||||
*
|
||||
*
|
||||
* @param tableName
|
||||
* @return
|
||||
* @throws IOException
|
||||
|
@ -136,7 +135,7 @@ public class TestTableInputFormat {
|
|||
|
||||
/**
|
||||
* Verify that the result and key have expected values.
|
||||
*
|
||||
*
|
||||
* @param r
|
||||
* @param key
|
||||
* @param expectedKey
|
||||
|
@ -155,12 +154,12 @@ public class TestTableInputFormat {
|
|||
/**
|
||||
* Create table data and run tests on specified htable using the
|
||||
* o.a.h.hbase.mapred API.
|
||||
*
|
||||
*
|
||||
* @param table
|
||||
* @throws IOException
|
||||
*/
|
||||
static void runTestMapred(Table table) throws IOException {
|
||||
org.apache.hadoop.hbase.mapred.TableRecordReader trr =
|
||||
org.apache.hadoop.hbase.mapred.TableRecordReader trr =
|
||||
new org.apache.hadoop.hbase.mapred.TableRecordReader();
|
||||
trr.setStartRow("aaa".getBytes());
|
||||
trr.setEndRow("zzz".getBytes());
|
||||
|
@ -186,7 +185,7 @@ public class TestTableInputFormat {
|
|||
|
||||
/**
|
||||
* Create a table that IOE's on first scanner next call
|
||||
*
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
static Table createIOEScannerTable(byte[] name, final int failCnt)
|
||||
|
@ -221,7 +220,7 @@ public class TestTableInputFormat {
|
|||
/**
|
||||
* Create a table that throws a DoNoRetryIOException on first scanner next
|
||||
* call
|
||||
*
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
static Table createDNRIOEScannerTable(byte[] name, final int failCnt)
|
||||
|
@ -258,7 +257,7 @@ public class TestTableInputFormat {
|
|||
|
||||
/**
|
||||
* Run test assuming no errors using mapred api.
|
||||
*
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
|
@ -269,7 +268,7 @@ public class TestTableInputFormat {
|
|||
|
||||
/**
|
||||
* Run test assuming Scanner IOException failure using mapred api,
|
||||
*
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
|
@ -280,7 +279,7 @@ public class TestTableInputFormat {
|
|||
|
||||
/**
|
||||
* Run test assuming Scanner IOException failure using mapred api,
|
||||
*
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test(expected = IOException.class)
|
||||
|
@ -291,7 +290,7 @@ public class TestTableInputFormat {
|
|||
|
||||
/**
|
||||
* Run test assuming NotServingRegionException using mapred api.
|
||||
*
|
||||
*
|
||||
* @throws org.apache.hadoop.hbase.DoNotRetryIOException
|
||||
*/
|
||||
@Test
|
||||
|
@ -302,7 +301,7 @@ public class TestTableInputFormat {
|
|||
|
||||
/**
|
||||
* Run test assuming NotServingRegionException using mapred api.
|
||||
*
|
||||
*
|
||||
* @throws org.apache.hadoop.hbase.DoNotRetryIOException
|
||||
*/
|
||||
@Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class)
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue