From dd2f3d4654009698887f76b417635eeacea443b2 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Tue, 15 Sep 2009 16:25:36 +0000 Subject: [PATCH] HBASE-1574 Client and server APIs to do batch deletes git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@815387 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../apache/hadoop/hbase/client/Delete.java | 6 +- .../hadoop/hbase/client/HConnection.java | 18 +- .../hbase/client/HConnectionManager.java | 227 ++++++++++++------ .../apache/hadoop/hbase/client/HTable.java | 21 +- .../org/apache/hadoop/hbase/client/Put.java | 12 +- .../org/apache/hadoop/hbase/client/Row.java | 30 +++ .../hadoop/hbase/io/HbaseObjectWritable.java | 2 + .../hadoop/hbase/ipc/HRegionInterface.java | 17 +- .../hbase/regionserver/HRegionServer.java | 32 ++- .../hadoop/hbase/client/TestClient.java | 36 ++- 11 files changed, 308 insertions(+), 94 deletions(-) create mode 100644 src/java/org/apache/hadoop/hbase/client/Row.java diff --git a/CHANGES.txt b/CHANGES.txt index 3b526656860..961045dc6d9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -49,6 +49,7 @@ Release 0.21.0 - Unreleased HBASE-1823 Ability for Scanners to bypass the block cache HBASE-1827 Add disabling block cache scanner flag to the shell HBASE-1835 Add more delete tests + HBASE-1574 Client and server APIs to do batch deletes OPTIMIZATIONS diff --git a/src/java/org/apache/hadoop/hbase/client/Delete.java b/src/java/org/apache/hadoop/hbase/client/Delete.java index 7fc407b3fb4..1dfa863712d 100644 --- a/src/java/org/apache/hadoop/hbase/client/Delete.java +++ b/src/java/org/apache/hadoop/hbase/client/Delete.java @@ -65,7 +65,7 @@ import org.apache.hadoop.hbase.util.Bytes; * deleteFamily -- then you need to use the method overrides that take a * timestamp. The constructor timestamp is not referenced. */ -public class Delete implements Writable { +public class Delete implements Writable, Row, Comparable { private byte [] row = null; // This ts is only used when doing a deleteRow. Anything less, private long ts; @@ -122,6 +122,10 @@ public class Delete implements Writable { this.familyMap.putAll(d.getFamilyMap()); } + public int compareTo(final Row d) { + return Bytes.compareTo(this.getRow(), d.getRow()); + } + /** * Method to check if the familyMap is empty * @return true if empty, false otherwise diff --git a/src/java/org/apache/hadoop/hbase/client/HConnection.java b/src/java/org/apache/hadoop/hbase/client/HConnection.java index 10aa41db749..b95cc69a38d 100644 --- a/src/java/org/apache/hadoop/hbase/client/HConnection.java +++ b/src/java/org/apache/hadoop/hbase/client/HConnection.java @@ -183,12 +183,22 @@ public interface HConnection { /** - * Process a batch of rows. Currently it only works for updates until - * HBASE-880 is available. Does the retries. - * @param list A batch of rows to process + * Process a batch of Puts. Does the retries. + * @param list A batch of Puts to process. * @param tableName The name of the table + * @return Count of committed Puts. On fault, < list.size(). * @throws IOException */ public int processBatchOfRows(ArrayList list, byte[] tableName) - throws IOException; + throws IOException; + + /** + * Process a batch of Deletes. Does the retries. + * @param list A batch of Deletes to process. + * @return Count of committed Deletes. On fault, < list.size(). + * @param tableName The name of the table + * @throws IOException + */ + public int processBatchOfDeletes(ArrayList list, byte[] tableName) + throws IOException; } \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java index b50230e17e9..19cc42a72db 100644 --- a/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -65,6 +65,9 @@ import org.apache.zookeeper.Watcher.Event.KeeperState; * Used by {@link HTable} and {@link HBaseAdmin} */ public class HConnectionManager implements HConstants { + private static final Delete [] DELETE_ARRAY_TYPE = new Delete[0]; + private static final Put [] PUT_ARRAY_TYPE = new Put[0]; + /* * Not instantiable. */ @@ -1054,85 +1057,167 @@ public class HConnectionManager implements HConstants { return location; } - public int processBatchOfRows(ArrayList list, byte[] tableName) - throws IOException { - if (list.isEmpty()) { - return 0; + /* + * Helper class for batch updates. + * Holds code shared doing batch puts and batch deletes. + */ + private abstract class Batch { + final HConnection c; + + private Batch(final HConnection c) { + this.c = c; } - boolean retryOnlyOne = false; - if (list.size() > 1) { - Collections.sort(list); - } - List currentPuts = new ArrayList(); - HRegionLocation location = - getRegionLocationForRowWithRetries(tableName, list.get(0).getRow(), - false); - byte [] currentRegion = location.getRegionInfo().getRegionName(); - byte [] region = currentRegion; - boolean isLastRow = false; - Put [] putarray = new Put[0]; - int i, tries; - for (i = 0, tries = 0; i < list.size() && tries < this.numRetries; i++) { - Put put = list.get(i); - currentPuts.add(put); - // If the next Put goes to a new region, then we are to clear - // currentPuts now during this cycle. - isLastRow = (i + 1) == list.size(); - if (!isLastRow) { - location = getRegionLocationForRowWithRetries(tableName, - list.get(i + 1).getRow(), false); - region = location.getRegionInfo().getRegionName(); + + /** + * This is the method subclasses must implement. + * @param currentList + * @param tableName + * @param row + * @return Count of items processed or -1 if all. + * @throws IOException + * @throws RuntimeException + */ + abstract int doCall(final List currentList, + final byte [] row, final byte [] tableName) + throws IOException, RuntimeException; + + /** + * Process the passed list. + * @param list + * @param tableName + * @return Count of how many added or -1 if all added. + * @throws IOException + */ + int process(final ArrayList list, final byte[] tableName) + throws IOException { + byte [] region = getRegionName(tableName, list.get(0).getRow(), false); + byte [] currentRegion = region; + boolean isLastRow = false; + boolean retryOnlyOne = false; + List currentList = new ArrayList(); + int i, tries; + for (i = 0, tries = 0; i < list.size() && tries < numRetries; i++) { + Row row = list.get(i); + currentList.add(row); + // If the next record goes to a new region, then we are to clear + // currentList now during this cycle. + isLastRow = (i + 1) == list.size(); + if (!isLastRow) { + region = getRegionName(tableName, list.get(i + 1).getRow(), false); + } + if (!Bytes.equals(currentRegion, region) || isLastRow || retryOnlyOne) { + int index = doCall(currentList, row.getRow(), tableName); + // index is == -1 if all processed successfully, else its index + // of last record successfully processed. + if (index != -1) { + if (tries == numRetries - 1) { + throw new RetriesExhaustedException("Some server, retryOnlyOne=" + + retryOnlyOne + ", index=" + index + ", islastrow=" + isLastRow + + ", tries=" + tries + ", numtries=" + numRetries + ", i=" + i + + ", listsize=" + list.size() + ", region=" + + Bytes.toStringBinary(region), currentRegion, row.getRow(), + tries, new ArrayList()); + } + tries = doBatchPause(currentRegion, tries); + i = i - currentList.size() + index; + retryOnlyOne = true; + // Reload location. + region = getRegionName(tableName, list.get(i + 1).getRow(), true); + } else { + // Reset these flags/counters on successful batch Put + retryOnlyOne = false; + tries = 0; + } + currentRegion = region; + currentList.clear(); + } } - if (!Bytes.equals(currentRegion, region) || isLastRow || retryOnlyOne) { - final Put [] puts = currentPuts.toArray(putarray); - int index = getRegionServerWithRetries(new ServerCallable( - this, tableName, put.getRow()) { + return i; + } + + /* + * @param t + * @param r + * @param re + * @return Region name that holds passed row r + * @throws IOException + */ + private byte [] getRegionName(final byte [] t, final byte [] r, + final boolean re) + throws IOException { + HRegionLocation location = getRegionLocationForRowWithRetries(t, r, re); + return location.getRegionInfo().getRegionName(); + } + + /* + * Do pause processing before retrying... + * @param currentRegion + * @param tries + * @return New value for tries. + */ + private int doBatchPause(final byte [] currentRegion, final int tries) { + int localTries = tries; + long sleepTime = getPauseTime(tries); + if (LOG.isDebugEnabled()) { + LOG.debug("Reloading region " + Bytes.toStringBinary(currentRegion) + + " location because regionserver didn't accept updates; tries=" + + tries + " of max=" + numRetries + ", waiting=" + sleepTime + "ms"); + } + try { + Thread.sleep(sleepTime); + localTries++; + } catch (InterruptedException e) { + // continue + } + return localTries; + } + } + + public int processBatchOfRows(final ArrayList list, + final byte[] tableName) + throws IOException { + if (list.isEmpty()) return 0; + if (list.size() > 1) Collections.sort(list); + Batch b = new Batch(this) { + @Override + int doCall(final List currentList, final byte [] row, + final byte [] tableName) + throws IOException, RuntimeException { + final Put [] puts = currentList.toArray(PUT_ARRAY_TYPE); + return getRegionServerWithRetries(new ServerCallable(this.c, + tableName, row) { public Integer call() throws IOException { return server.put(location.getRegionInfo().getRegionName(), puts); } }); - // index is == -1 if all puts processed successfully, else its index - // of last Put successfully processed. - if (index != -1) { - if (tries == numRetries - 1) { - throw new RetriesExhaustedException("Some server, retryOnlyOne=" + - retryOnlyOne + ", index=" + index + ", islastrow=" + isLastRow + - ", tries=" + tries + ", numtries=" + numRetries + ", i=" + i + - ", listsize=" + list.size() + ", location=" + location + - ", region=" + Bytes.toStringBinary(region), - currentRegion, put.getRow(), tries, new ArrayList()); - } - long sleepTime = getPauseTime(tries); - if (LOG.isDebugEnabled()) { - LOG.debug("Reloading region " + Bytes.toStringBinary(currentRegion) + - " location because regionserver didn't accept updates; " + - "tries=" + tries + " of max=" + this.numRetries + - ", waiting=" + sleepTime + "ms"); - } - try { - Thread.sleep(sleepTime); - tries++; - } catch (InterruptedException e) { - // continue - } - i = i - puts.length + index; - retryOnlyOne = true; - // Reload location. - location = getRegionLocationForRowWithRetries(tableName, - list.get(i + 1).getRow(), true); - region = location.getRegionInfo().getRegionName(); - } else { - // Reset these flags/counters on successful batch Put - retryOnlyOne = false; - tries = 0; - } - currentRegion = region; - currentPuts.clear(); } - } - return i; + }; + return b.process(list, tableName); } + public int processBatchOfDeletes(final ArrayList list, + final byte[] tableName) + throws IOException { + if (list.isEmpty()) return 0; + if (list.size() > 1) Collections.sort(list); + Batch b = new Batch(this) { + @Override + int doCall(final List currentList, final byte [] row, + final byte [] tableName) + throws IOException, RuntimeException { + final Delete [] deletes = currentList.toArray(DELETE_ARRAY_TYPE); + return getRegionServerWithRetries(new ServerCallable(this.c, + tableName, row) { + public Integer call() throws IOException { + return server.delete(location.getRegionInfo().getRegionName(), + deletes); + } + }); + } + }; + return b.process(list, tableName); + } + void close(boolean stopProxy) { if (master != null) { if (stopProxy) { @@ -1150,4 +1235,4 @@ public class HConnectionManager implements HConstants { } } } -} +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/client/HTable.java b/src/java/org/apache/hadoop/hbase/client/HTable.java index 21a8599b7bc..cf00fe0e862 100644 --- a/src/java/org/apache/hadoop/hbase/client/HTable.java +++ b/src/java/org/apache/hadoop/hbase/client/HTable.java @@ -438,6 +438,23 @@ public class HTable implements HTableInterface { ); } + /** + * Bulk commit a List of Deletes to the table. + * @param deletes List of deletes. List is modified by this method. On + * exception holds deletes that were NOT applied. + * @throws IOException + * @since 0.20.1 + */ + public synchronized void delete(final ArrayList deletes) + throws IOException { + int last = 0; + try { + last = connection.processBatchOfDeletes(deletes, this.tableName); + } finally { + deletes.subList(0, last).clear(); + } + } + /** * Commit a Put to the table. *

@@ -464,12 +481,12 @@ public class HTable implements HTableInterface { * @since 0.20.0 */ public synchronized void put(final List puts) throws IOException { - for(Put put : puts) { + for (Put put : puts) { validatePut(put); writeBuffer.add(put); currentWriteBufferSize += put.heapSize(); } - if(autoFlush || currentWriteBufferSize > writeBufferSize) { + if (autoFlush || currentWriteBufferSize > writeBufferSize) { flushCommits(); } } diff --git a/src/java/org/apache/hadoop/hbase/client/Put.java b/src/java/org/apache/hadoop/hbase/client/Put.java index 2532800fcf1..11a46ed68a0 100644 --- a/src/java/org/apache/hadoop/hbase/client/Put.java +++ b/src/java/org/apache/hadoop/hbase/client/Put.java @@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.util.ClassSize; * for each column to be inserted, execute {@link #add(byte[], byte[], byte[]) add} or * {@link #add(byte[], byte[], long, byte[]) add} if setting the timestamp. */ -public class Put implements HeapSize, Writable, Comparable { +public class Put implements HeapSize, Writable, Row, Comparable { private byte [] row = null; private long timestamp = HConstants.LATEST_TIMESTAMP; private long lockId = -1L; @@ -158,10 +158,10 @@ public class Put implements HeapSize, Writable, Comparable { int res = Bytes.compareTo(this.row, 0, row.length, kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()); if(res != 0) { - throw new IOException("The row in the recently added KeyValue " + - Bytes.toStringBinary(kv.getBuffer(), kv.getRowOffset(), - kv.getRowLength()) + " doesn't match the original one " + - Bytes.toStringBinary(this.row)); + throw new IOException("The row in the recently added KeyValue " + + Bytes.toStringBinary(kv.getBuffer(), kv.getRowOffset(), + kv.getRowLength()) + " doesn't match the original one " + + Bytes.toStringBinary(this.row)); } list.add(kv); familyMap.put(family, list); @@ -293,7 +293,7 @@ public class Put implements HeapSize, Writable, Comparable { return sb.toString(); } - public int compareTo(Put p) { + public int compareTo(Row p) { return Bytes.compareTo(this.getRow(), p.getRow()); } diff --git a/src/java/org/apache/hadoop/hbase/client/Row.java b/src/java/org/apache/hadoop/hbase/client/Row.java new file mode 100644 index 00000000000..6017dadab03 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/Row.java @@ -0,0 +1,30 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +/** + * Has a row. + */ +interface Row { + /** + * @return The row. + */ + public byte [] getRow(); +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java b/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java index a770d73f48b..2b2b7467747 100644 --- a/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java +++ b/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java @@ -151,6 +151,8 @@ public class HbaseObjectWritable implements Writable, Configurable { addToMap(QualifierFilter.class, code++); addToMap(SkipFilter.class, code++); addToMap(WritableByteArrayComparable.class, code++); + + addToMap(Delete [].class, code++); } private Class declaredClass; diff --git a/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java b/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java index 0e33b4e9877..7d2d3d73048 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java +++ b/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java @@ -108,8 +108,7 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion { */ public int put(final byte[] regionName, final Put [] puts) throws IOException; - - + /** * Deletes all the KeyValues that match those found in the Delete object, * if their ts <= to the Delete. In case of a delete with a specific ts it @@ -120,7 +119,19 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion { */ public void delete(final byte[] regionName, final Delete delete) throws IOException; - + + /** + * Put an array of deletes into the specified region + * + * @param regionName + * @param deletes + * @return The number of processed deletes. Returns -1 if all Deletes + * processed successfully. + * @throws IOException + */ + public int delete(final byte[] regionName, final Delete [] deletes) + throws IOException; + /** * Atomically checks if a row/family/qualifier value match the expectedValue. * If it does, it adds the put. diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 5b46579e746..c03717cc0d2 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1798,7 +1798,7 @@ public class HRegionServer implements HConstants, HRegionInterface, throw convertThrowableToIOE(cleanup(t)); } } - + public int put(final byte[] regionName, final Put [] puts) throws IOException { // Count of Puts processed. @@ -1824,7 +1824,6 @@ public class HRegionServer implements HConstants, HRegionInterface, // All have been processed successfully. return -1; } - /** * @@ -1989,7 +1988,6 @@ public class HRegionServer implements HConstants, HRegionInterface, // // Methods that do the actual work for the remote API // - public void delete(final byte [] regionName, final Delete delete) throws IOException { checkOpen(); @@ -2006,7 +2004,33 @@ public class HRegionServer implements HConstants, HRegionInterface, throw convertThrowableToIOE(cleanup(t)); } } - + + public int delete(final byte[] regionName, final Delete [] deletes) + throws IOException { + // Count of Deletes processed. + int i = 0; + checkOpen(); + try { + boolean writeToWAL = true; + this.cacheFlusher.reclaimMemStoreMemory(); + Integer[] locks = new Integer[deletes.length]; + HRegion region = getRegion(regionName); + for (i = 0; i < deletes.length; i++) { + this.requestCount.incrementAndGet(); + locks[i] = getLockFromId(deletes[i].getLockId()); + region.delete(deletes[i], locks[i], writeToWAL); + } + } catch (WrongRegionException ex) { + LOG.debug("Batch deletes: " + i, ex); + return i; + } catch (NotServingRegionException ex) { + return i; + } catch (Throwable t) { + throw convertThrowableToIOE(cleanup(t)); + } + // All have been processed successfully. + return -1; + } public long lockRow(byte [] regionName, byte [] row) throws IOException { diff --git a/src/test/org/apache/hadoop/hbase/client/TestClient.java b/src/test/org/apache/hadoop/hbase/client/TestClient.java index 56f064f8073..6dbed135988 100644 --- a/src/test/org/apache/hadoop/hbase/client/TestClient.java +++ b/src/test/org/apache/hadoop/hbase/client/TestClient.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.util.ArrayList; import org.apache.hadoop.hbase.HBaseClusterTestCase; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -49,7 +50,7 @@ public class TestClient extends HBaseClusterTestCase { super(); } - public void XtestSuperSimple() throws Exception { + public void testSuperSimple() throws Exception { byte [] TABLE = Bytes.toBytes("testSuperSimple"); HTable ht = createTable(TABLE, FAMILY); Put put = new Put(ROW); @@ -1071,7 +1072,6 @@ public class TestClient extends HBaseClusterTestCase { } public void testDeletes() throws Exception { - byte [] TABLE = Bytes.toBytes("testDeletes"); byte [][] ROWS = makeNAscii(ROW, 6); @@ -1318,8 +1318,38 @@ public class TestClient extends HBaseClusterTestCase { assertTrue(Bytes.equals(result.sorted()[0].getValue(), VALUES[1])); assertTrue(Bytes.equals(result.sorted()[1].getValue(), VALUES[2])); scanner.close(); + + // Add test of bulk deleting. + for (int i = 0; i < 10; i++) { + byte [] bytes = Bytes.toBytes(i); + put = new Put(bytes); + put.add(FAMILIES[0], QUALIFIER, bytes); + ht.put(put); + } + for (int i = 0; i < 10; i++) { + byte [] bytes = Bytes.toBytes(i); + get = new Get(bytes); + get.addFamily(FAMILIES[0]); + result = ht.get(get); + assertTrue(result.size() == 1); + } + ArrayList deletes = new ArrayList(); + for (int i = 0; i < 10; i++) { + byte [] bytes = Bytes.toBytes(i); + delete = new Delete(bytes); + delete.deleteFamily(FAMILIES[0]); + deletes.add(delete); + } + ht.delete(deletes); + for (int i = 0; i < 10; i++) { + byte [] bytes = Bytes.toBytes(i); + get = new Get(bytes); + get.addFamily(FAMILIES[0]); + result = ht.get(get); + assertTrue(result.size() == 0); + } } - + /** * Baseline "scalability" test. *