From a618fea1629b7f2dce8cd78eae81fd80af0e7360 Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Sat, 9 Jul 2011 03:15:02 +0000 Subject: [PATCH] HBASE-4079 HTableUtil - helper class for loading data (Doug Meil via Ted Yu) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1144581 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../hadoop/hbase/client/HTableUtil.java | 137 ++++++++++++++++++ .../hadoop/hbase/client/TestHTableUtil.java | 125 ++++++++++++++++ 3 files changed, 263 insertions(+) create mode 100644 src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java create mode 100644 src/test/java/org/apache/hadoop/hbase/client/TestHTableUtil.java diff --git a/CHANGES.txt b/CHANGES.txt index d7774a4f80c..2f20947b84b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -312,6 +312,7 @@ Release 0.91.0 - Unreleased HBASE-3240 Improve documentation of importtsv and bulk loads. (Aaron T. Myers via todd) HBASE-4054 Usability improvement to HTablePool (Daniel Iancu) + HBASE-4079 HTableUtil - helper class for loading data (Doug Meil via Ted Yu) TASKS HBASE-3559 Move report of split to master OFF the heartbeat channel diff --git a/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java b/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java new file mode 100644 index 00000000000..bc0872af804 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java @@ -0,0 +1,137 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.lang.InterruptedException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; + +/** + * Utility class for HTable. + * + * + */ +public class HTableUtil { + + private static final int INITIAL_LIST_SIZE = 250; + + /** + * Processes a List of Puts and writes them to an HTable instance in RegionServer buckets via the htable.put method. + * This will utilize the writeBuffer, thus the writeBuffer flush frequency may be tuned accordingly via htable.setWriteBufferSize. + *

+ * The benefit of submitting Puts in this manner is to minimize the number of RegionServer RPCs in each flush. + *

+ * Assumption #1: Regions have been pre-created for the table. If they haven't, then all of the Puts will go to the same region, + * defeating the purpose of this utility method. See the Apache HBase book for an explanation of how to do this. + *
+ * Assumption #2: Row-keys are not monotonically increasing. See the Apache HBase book for an explanation of this problem. + *
+ * Assumption #3: That the input list of Puts is big enough to be useful (in the thousands or more). The intent of this + * method is to process larger chunks of data. + *
+ * Assumption #4: htable.setAutoFlush(false) has been set. This is a requirement to use the writeBuffer. + *

+ * @param htable HTable instance for target HBase table + * @param puts List of Put instances + * @throws IOException if a remote or network exception occurs + * + */ + public static void bucketRsPut(HTable htable, List puts) throws IOException { + + Map> putMap = createRsPutMap(htable, puts); + for (List rsPuts: putMap.values()) { + htable.put( rsPuts ); + } + htable.flushCommits(); + } + + /** + * Processes a List of Rows (Put, Delete) and writes them to an HTable instance in RegionServer buckets via the htable.batch method. + *

+ * The benefit of submitting Puts in this manner is to minimize the number of RegionServer RPCs, thus this will + * produce one RPC of Puts per RegionServer. + *

+ * Assumption #1: Regions have been pre-created for the table. If they haven't, then all of the Puts will go to the same region, + * defeating the purpose of this utility method. See the Apache HBase book for an explanation of how to do this. + *
+ * Assumption #2: Row-keys are not monotonically increasing. See the Apache HBase book for an explanation of this problem. + *
+ * Assumption #3: That the input list of Rows is big enough to be useful (in the thousands or more). The intent of this + * method is to process larger chunks of data. + *

+ * This method accepts a list of Row objects because the underlying .batch method accepts a list of Row objects. + *

+ * @param htable HTable instance for target HBase table + * @param rows List of Row instances + * @throws IOException if a remote or network exception occurs + */ + public static void bucketRsBatch(HTable htable, List rows) throws IOException { + + try { + Map> rowMap = createRsRowMap(htable, rows); + for (List rsRows: rowMap.values()) { + htable.batch( rsRows ); + } + } catch (InterruptedException e) { + throw new IOException(e); + } + + } + + private static Map> createRsPutMap(HTable htable, List puts) throws IOException { + + Map> putMap = new HashMap>(); + for (Put put: puts) { + HRegionLocation rl = htable.getRegionLocation( put.getRow() ); + String hostname = rl.getHostname(); + List recs = putMap.get( hostname); + if (recs == null) { + recs = new ArrayList(INITIAL_LIST_SIZE); + putMap.put( hostname, recs); + } + recs.add(put); + } + return putMap; + } + + private static Map> createRsRowMap(HTable htable, List rows) throws IOException { + + Map> rowMap = new HashMap>(); + for (Row row: rows) { + HRegionLocation rl = htable.getRegionLocation( row.getRow() ); + String hostname = rl.getHostname(); + List recs = rowMap.get( hostname); + if (recs == null) { + recs = new ArrayList(INITIAL_LIST_SIZE); + rowMap.put( hostname, recs); + } + recs.add(row); + } + return rowMap; + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestHTableUtil.java b/src/test/java/org/apache/hadoop/hbase/client/TestHTableUtil.java new file mode 100644 index 00000000000..f0be2c86747 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/client/TestHTableUtil.java @@ -0,0 +1,125 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * This class provides tests for the {@link HTableUtil} class + * + */ +public class TestHTableUtil { + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static byte [] ROW = Bytes.toBytes("testRow"); + private static byte [] FAMILY = Bytes.toBytes("testFamily"); + private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); + private static byte [] VALUE = Bytes.toBytes("testValue"); + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(3); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * + * @throws Exception + */ + @Test + public void testBucketPut() throws Exception { + byte [] TABLE = Bytes.toBytes("testBucketPut"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + ht.setAutoFlush( false ); + + List puts = new ArrayList(); + puts.add( createPut("row1") ); + puts.add( createPut("row2") ); + puts.add( createPut("row3") ); + puts.add( createPut("row4") ); + + HTableUtil.bucketRsPut( ht, puts ); + + Scan scan = new Scan(); + scan.addColumn(FAMILY, QUALIFIER); + int count = 0; + for(Result result : ht.getScanner(scan)) { + count++; + } + LOG.info("bucket put count=" + count); + assertEquals(count, puts.size()); + } + + private Put createPut(String row) { + Put put = new Put( Bytes.toBytes(row)); + put.add(FAMILY, QUALIFIER, VALUE); + return put; + } + + /** + * + * @throws Exception + */ + @Test + public void testBucketBatch() throws Exception { + byte [] TABLE = Bytes.toBytes("testBucketBatch"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + + List rows = new ArrayList(); + rows.add( createPut("row1") ); + rows.add( createPut("row2") ); + rows.add( createPut("row3") ); + rows.add( createPut("row4") ); + + HTableUtil.bucketRsBatch( ht, rows ); + + Scan scan = new Scan(); + scan.addColumn(FAMILY, QUALIFIER); + + int count = 0; + for(Result result : ht.getScanner(scan)) { + count++; + } + LOG.info("bucket batch count=" + count); + assertEquals(count, rows.size()); + } + +}