HBASE-4079 HTableUtil - helper class for loading data (Doug Meil via Ted Yu)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1144581 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-07-09 03:15:02 +00:00
parent c08fe0f40e
commit a618fea162
3 changed files with 263 additions and 0 deletions

View File

@ -312,6 +312,7 @@ Release 0.91.0 - Unreleased
HBASE-3240 Improve documentation of importtsv and bulk loads.
(Aaron T. Myers via todd)
HBASE-4054 Usability improvement to HTablePool (Daniel Iancu)
HBASE-4079 HTableUtil - helper class for loading data (Doug Meil via Ted Yu)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -0,0 +1,137 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.lang.InterruptedException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
/**
* Utility class for HTable.
*
*
*/
public class HTableUtil {
private static final int INITIAL_LIST_SIZE = 250;
/**
* Processes a List of Puts and writes them to an HTable instance in RegionServer buckets via the htable.put method.
* This will utilize the writeBuffer, thus the writeBuffer flush frequency may be tuned accordingly via htable.setWriteBufferSize.
* <br><br>
* The benefit of submitting Puts in this manner is to minimize the number of RegionServer RPCs in each flush.
* <br><br>
* Assumption #1: Regions have been pre-created for the table. If they haven't, then all of the Puts will go to the same region,
* defeating the purpose of this utility method. See the Apache HBase book for an explanation of how to do this.
* <br>
* Assumption #2: Row-keys are not monotonically increasing. See the Apache HBase book for an explanation of this problem.
* <br>
* Assumption #3: That the input list of Puts is big enough to be useful (in the thousands or more). The intent of this
* method is to process larger chunks of data.
* <br>
* Assumption #4: htable.setAutoFlush(false) has been set. This is a requirement to use the writeBuffer.
* <br><br>
* @param htable HTable instance for target HBase table
* @param puts List of Put instances
* @throws IOException if a remote or network exception occurs
*
*/
public static void bucketRsPut(HTable htable, List<Put> puts) throws IOException {
Map<String, List<Put>> putMap = createRsPutMap(htable, puts);
for (List<Put> rsPuts: putMap.values()) {
htable.put( rsPuts );
}
htable.flushCommits();
}
/**
* Processes a List of Rows (Put, Delete) and writes them to an HTable instance in RegionServer buckets via the htable.batch method.
* <br><br>
* The benefit of submitting Puts in this manner is to minimize the number of RegionServer RPCs, thus this will
* produce one RPC of Puts per RegionServer.
* <br><br>
* Assumption #1: Regions have been pre-created for the table. If they haven't, then all of the Puts will go to the same region,
* defeating the purpose of this utility method. See the Apache HBase book for an explanation of how to do this.
* <br>
* Assumption #2: Row-keys are not monotonically increasing. See the Apache HBase book for an explanation of this problem.
* <br>
* Assumption #3: That the input list of Rows is big enough to be useful (in the thousands or more). The intent of this
* method is to process larger chunks of data.
* <br><br>
* This method accepts a list of Row objects because the underlying .batch method accepts a list of Row objects.
* <br><br>
* @param htable HTable instance for target HBase table
* @param rows List of Row instances
* @throws IOException if a remote or network exception occurs
*/
public static void bucketRsBatch(HTable htable, List<Row> rows) throws IOException {
try {
Map<String, List<Row>> rowMap = createRsRowMap(htable, rows);
for (List<Row> rsRows: rowMap.values()) {
htable.batch( rsRows );
}
} catch (InterruptedException e) {
throw new IOException(e);
}
}
private static Map<String,List<Put>> createRsPutMap(HTable htable, List<Put> puts) throws IOException {
Map<String, List<Put>> putMap = new HashMap<String, List<Put>>();
for (Put put: puts) {
HRegionLocation rl = htable.getRegionLocation( put.getRow() );
String hostname = rl.getHostname();
List<Put> recs = putMap.get( hostname);
if (recs == null) {
recs = new ArrayList<Put>(INITIAL_LIST_SIZE);
putMap.put( hostname, recs);
}
recs.add(put);
}
return putMap;
}
private static Map<String,List<Row>> createRsRowMap(HTable htable, List<Row> rows) throws IOException {
Map<String, List<Row>> rowMap = new HashMap<String, List<Row>>();
for (Row row: rows) {
HRegionLocation rl = htable.getRegionLocation( row.getRow() );
String hostname = rl.getHostname();
List<Row> recs = rowMap.get( hostname);
if (recs == null) {
recs = new ArrayList<Row>(INITIAL_LIST_SIZE);
rowMap.put( hostname, recs);
}
recs.add(row);
}
return rowMap;
}
}

View File

@ -0,0 +1,125 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* This class provides tests for the {@link HTableUtil} class
*
*/
public class TestHTableUtil {
final Log LOG = LogFactory.getLog(getClass());
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static byte [] ROW = Bytes.toBytes("testRow");
private static byte [] FAMILY = Bytes.toBytes("testFamily");
private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte [] VALUE = Bytes.toBytes("testValue");
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(3);
}
/**
* @throws java.lang.Exception
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
/**
*
* @throws Exception
*/
@Test
public void testBucketPut() throws Exception {
byte [] TABLE = Bytes.toBytes("testBucketPut");
HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
ht.setAutoFlush( false );
List<Put> puts = new ArrayList<Put>();
puts.add( createPut("row1") );
puts.add( createPut("row2") );
puts.add( createPut("row3") );
puts.add( createPut("row4") );
HTableUtil.bucketRsPut( ht, puts );
Scan scan = new Scan();
scan.addColumn(FAMILY, QUALIFIER);
int count = 0;
for(Result result : ht.getScanner(scan)) {
count++;
}
LOG.info("bucket put count=" + count);
assertEquals(count, puts.size());
}
private Put createPut(String row) {
Put put = new Put( Bytes.toBytes(row));
put.add(FAMILY, QUALIFIER, VALUE);
return put;
}
/**
*
* @throws Exception
*/
@Test
public void testBucketBatch() throws Exception {
byte [] TABLE = Bytes.toBytes("testBucketBatch");
HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
List<Row> rows = new ArrayList<Row>();
rows.add( createPut("row1") );
rows.add( createPut("row2") );
rows.add( createPut("row3") );
rows.add( createPut("row4") );
HTableUtil.bucketRsBatch( ht, rows );
Scan scan = new Scan();
scan.addColumn(FAMILY, QUALIFIER);
int count = 0;
for(Result result : ht.getScanner(scan)) {
count++;
}
LOG.info("bucket batch count=" + count);
assertEquals(count, rows.size());
}
}