HBASE-1574 Client and server APIs to do batch deletes

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@815387 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-09-15 16:25:36 +00:00
parent 10b3026dc7
commit dd2f3d4654
11 changed files with 308 additions and 94 deletions

View File

@ -49,6 +49,7 @@ Release 0.21.0 - Unreleased
HBASE-1823 Ability for Scanners to bypass the block cache
HBASE-1827 Add disabling block cache scanner flag to the shell
HBASE-1835 Add more delete tests
HBASE-1574 Client and server APIs to do batch deletes
OPTIMIZATIONS

View File

@ -65,7 +65,7 @@ import org.apache.hadoop.hbase.util.Bytes;
* deleteFamily -- then you need to use the method overrides that take a
* timestamp. The constructor timestamp is not referenced.
*/
public class Delete implements Writable {
public class Delete implements Writable, Row, Comparable<Row> {
private byte [] row = null;
// This ts is only used when doing a deleteRow. Anything less,
private long ts;
@ -122,6 +122,10 @@ public class Delete implements Writable {
this.familyMap.putAll(d.getFamilyMap());
}
public int compareTo(final Row d) {
return Bytes.compareTo(this.getRow(), d.getRow());
}
/**
* Method to check if the familyMap is empty
* @return true if empty, false otherwise

View File

@ -183,12 +183,22 @@ public interface HConnection {
/**
* Process a batch of rows. Currently it only works for updates until
* HBASE-880 is available. Does the retries.
* @param list A batch of rows to process
* Process a batch of Puts. Does the retries.
* @param list A batch of Puts to process.
* @param tableName The name of the table
* @return Count of committed Puts. On fault, < list.size().
* @throws IOException
*/
public int processBatchOfRows(ArrayList<Put> list, byte[] tableName)
throws IOException;
throws IOException;
/**
* Process a batch of Deletes. Does the retries.
* @param list A batch of Deletes to process.
* @return Count of committed Deletes. On fault, < list.size().
* @param tableName The name of the table
* @throws IOException
*/
public int processBatchOfDeletes(ArrayList<Delete> list, byte[] tableName)
throws IOException;
}

View File

@ -65,6 +65,9 @@ import org.apache.zookeeper.Watcher.Event.KeeperState;
* Used by {@link HTable} and {@link HBaseAdmin}
*/
public class HConnectionManager implements HConstants {
private static final Delete [] DELETE_ARRAY_TYPE = new Delete[0];
private static final Put [] PUT_ARRAY_TYPE = new Put[0];
/*
* Not instantiable.
*/
@ -1054,85 +1057,167 @@ public class HConnectionManager implements HConstants {
return location;
}
public int processBatchOfRows(ArrayList<Put> list, byte[] tableName)
throws IOException {
if (list.isEmpty()) {
return 0;
/*
* Helper class for batch updates.
* Holds code shared doing batch puts and batch deletes.
*/
private abstract class Batch {
final HConnection c;
private Batch(final HConnection c) {
this.c = c;
}
boolean retryOnlyOne = false;
if (list.size() > 1) {
Collections.sort(list);
}
List<Put> currentPuts = new ArrayList<Put>();
HRegionLocation location =
getRegionLocationForRowWithRetries(tableName, list.get(0).getRow(),
false);
byte [] currentRegion = location.getRegionInfo().getRegionName();
byte [] region = currentRegion;
boolean isLastRow = false;
Put [] putarray = new Put[0];
int i, tries;
for (i = 0, tries = 0; i < list.size() && tries < this.numRetries; i++) {
Put put = list.get(i);
currentPuts.add(put);
// If the next Put goes to a new region, then we are to clear
// currentPuts now during this cycle.
isLastRow = (i + 1) == list.size();
if (!isLastRow) {
location = getRegionLocationForRowWithRetries(tableName,
list.get(i + 1).getRow(), false);
region = location.getRegionInfo().getRegionName();
/**
* This is the method subclasses must implement.
* @param currentList
* @param tableName
* @param row
* @return Count of items processed or -1 if all.
* @throws IOException
* @throws RuntimeException
*/
abstract int doCall(final List<Row> currentList,
final byte [] row, final byte [] tableName)
throws IOException, RuntimeException;
/**
* Process the passed <code>list</code>.
* @param list
* @param tableName
* @return Count of how many added or -1 if all added.
* @throws IOException
*/
int process(final ArrayList<? extends Row> list, final byte[] tableName)
throws IOException {
byte [] region = getRegionName(tableName, list.get(0).getRow(), false);
byte [] currentRegion = region;
boolean isLastRow = false;
boolean retryOnlyOne = false;
List<Row> currentList = new ArrayList<Row>();
int i, tries;
for (i = 0, tries = 0; i < list.size() && tries < numRetries; i++) {
Row row = list.get(i);
currentList.add(row);
// If the next record goes to a new region, then we are to clear
// currentList now during this cycle.
isLastRow = (i + 1) == list.size();
if (!isLastRow) {
region = getRegionName(tableName, list.get(i + 1).getRow(), false);
}
if (!Bytes.equals(currentRegion, region) || isLastRow || retryOnlyOne) {
int index = doCall(currentList, row.getRow(), tableName);
// index is == -1 if all processed successfully, else its index
// of last record successfully processed.
if (index != -1) {
if (tries == numRetries - 1) {
throw new RetriesExhaustedException("Some server, retryOnlyOne=" +
retryOnlyOne + ", index=" + index + ", islastrow=" + isLastRow +
", tries=" + tries + ", numtries=" + numRetries + ", i=" + i +
", listsize=" + list.size() + ", region=" +
Bytes.toStringBinary(region), currentRegion, row.getRow(),
tries, new ArrayList<Throwable>());
}
tries = doBatchPause(currentRegion, tries);
i = i - currentList.size() + index;
retryOnlyOne = true;
// Reload location.
region = getRegionName(tableName, list.get(i + 1).getRow(), true);
} else {
// Reset these flags/counters on successful batch Put
retryOnlyOne = false;
tries = 0;
}
currentRegion = region;
currentList.clear();
}
}
if (!Bytes.equals(currentRegion, region) || isLastRow || retryOnlyOne) {
final Put [] puts = currentPuts.toArray(putarray);
int index = getRegionServerWithRetries(new ServerCallable<Integer>(
this, tableName, put.getRow()) {
return i;
}
/*
* @param t
* @param r
* @param re
* @return Region name that holds passed row <code>r</code>
* @throws IOException
*/
private byte [] getRegionName(final byte [] t, final byte [] r,
final boolean re)
throws IOException {
HRegionLocation location = getRegionLocationForRowWithRetries(t, r, re);
return location.getRegionInfo().getRegionName();
}
/*
* Do pause processing before retrying...
* @param currentRegion
* @param tries
* @return New value for tries.
*/
private int doBatchPause(final byte [] currentRegion, final int tries) {
int localTries = tries;
long sleepTime = getPauseTime(tries);
if (LOG.isDebugEnabled()) {
LOG.debug("Reloading region " + Bytes.toStringBinary(currentRegion) +
" location because regionserver didn't accept updates; tries=" +
tries + " of max=" + numRetries + ", waiting=" + sleepTime + "ms");
}
try {
Thread.sleep(sleepTime);
localTries++;
} catch (InterruptedException e) {
// continue
}
return localTries;
}
}
public int processBatchOfRows(final ArrayList<Put> list,
final byte[] tableName)
throws IOException {
if (list.isEmpty()) return 0;
if (list.size() > 1) Collections.sort(list);
Batch b = new Batch(this) {
@Override
int doCall(final List<Row> currentList, final byte [] row,
final byte [] tableName)
throws IOException, RuntimeException {
final Put [] puts = currentList.toArray(PUT_ARRAY_TYPE);
return getRegionServerWithRetries(new ServerCallable<Integer>(this.c,
tableName, row) {
public Integer call() throws IOException {
return server.put(location.getRegionInfo().getRegionName(), puts);
}
});
// index is == -1 if all puts processed successfully, else its index
// of last Put successfully processed.
if (index != -1) {
if (tries == numRetries - 1) {
throw new RetriesExhaustedException("Some server, retryOnlyOne=" +
retryOnlyOne + ", index=" + index + ", islastrow=" + isLastRow +
", tries=" + tries + ", numtries=" + numRetries + ", i=" + i +
", listsize=" + list.size() + ", location=" + location +
", region=" + Bytes.toStringBinary(region),
currentRegion, put.getRow(), tries, new ArrayList<Throwable>());
}
long sleepTime = getPauseTime(tries);
if (LOG.isDebugEnabled()) {
LOG.debug("Reloading region " + Bytes.toStringBinary(currentRegion) +
" location because regionserver didn't accept updates; " +
"tries=" + tries + " of max=" + this.numRetries +
", waiting=" + sleepTime + "ms");
}
try {
Thread.sleep(sleepTime);
tries++;
} catch (InterruptedException e) {
// continue
}
i = i - puts.length + index;
retryOnlyOne = true;
// Reload location.
location = getRegionLocationForRowWithRetries(tableName,
list.get(i + 1).getRow(), true);
region = location.getRegionInfo().getRegionName();
} else {
// Reset these flags/counters on successful batch Put
retryOnlyOne = false;
tries = 0;
}
currentRegion = region;
currentPuts.clear();
}
}
return i;
};
return b.process(list, tableName);
}
public int processBatchOfDeletes(final ArrayList<Delete> list,
final byte[] tableName)
throws IOException {
if (list.isEmpty()) return 0;
if (list.size() > 1) Collections.sort(list);
Batch b = new Batch(this) {
@Override
int doCall(final List<Row> currentList, final byte [] row,
final byte [] tableName)
throws IOException, RuntimeException {
final Delete [] deletes = currentList.toArray(DELETE_ARRAY_TYPE);
return getRegionServerWithRetries(new ServerCallable<Integer>(this.c,
tableName, row) {
public Integer call() throws IOException {
return server.delete(location.getRegionInfo().getRegionName(),
deletes);
}
});
}
};
return b.process(list, tableName);
}
void close(boolean stopProxy) {
if (master != null) {
if (stopProxy) {
@ -1150,4 +1235,4 @@ public class HConnectionManager implements HConstants {
}
}
}
}
}

View File

@ -438,6 +438,23 @@ public class HTable implements HTableInterface {
);
}
/**
* Bulk commit a List of Deletes to the table.
* @param deletes List of deletes. List is modified by this method. On
* exception holds deletes that were NOT applied.
* @throws IOException
* @since 0.20.1
*/
public synchronized void delete(final ArrayList<Delete> deletes)
throws IOException {
int last = 0;
try {
last = connection.processBatchOfDeletes(deletes, this.tableName);
} finally {
deletes.subList(0, last).clear();
}
}
/**
* Commit a Put to the table.
* <p>
@ -464,12 +481,12 @@ public class HTable implements HTableInterface {
* @since 0.20.0
*/
public synchronized void put(final List<Put> puts) throws IOException {
for(Put put : puts) {
for (Put put : puts) {
validatePut(put);
writeBuffer.add(put);
currentWriteBufferSize += put.heapSize();
}
if(autoFlush || currentWriteBufferSize > writeBufferSize) {
if (autoFlush || currentWriteBufferSize > writeBufferSize) {
flushCommits();
}
}

View File

@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.util.ClassSize;
* for each column to be inserted, execute {@link #add(byte[], byte[], byte[]) add} or
* {@link #add(byte[], byte[], long, byte[]) add} if setting the timestamp.
*/
public class Put implements HeapSize, Writable, Comparable<Put> {
public class Put implements HeapSize, Writable, Row, Comparable<Row> {
private byte [] row = null;
private long timestamp = HConstants.LATEST_TIMESTAMP;
private long lockId = -1L;
@ -158,10 +158,10 @@ public class Put implements HeapSize, Writable, Comparable<Put> {
int res = Bytes.compareTo(this.row, 0, row.length,
kv.getBuffer(), kv.getRowOffset(), kv.getRowLength());
if(res != 0) {
throw new IOException("The row in the recently added KeyValue " +
Bytes.toStringBinary(kv.getBuffer(), kv.getRowOffset(),
kv.getRowLength()) + " doesn't match the original one " +
Bytes.toStringBinary(this.row));
throw new IOException("The row in the recently added KeyValue " +
Bytes.toStringBinary(kv.getBuffer(), kv.getRowOffset(),
kv.getRowLength()) + " doesn't match the original one " +
Bytes.toStringBinary(this.row));
}
list.add(kv);
familyMap.put(family, list);
@ -293,7 +293,7 @@ public class Put implements HeapSize, Writable, Comparable<Put> {
return sb.toString();
}
public int compareTo(Put p) {
public int compareTo(Row p) {
return Bytes.compareTo(this.getRow(), p.getRow());
}

View File

@ -0,0 +1,30 @@
/*
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
/**
* Has a row.
*/
interface Row {
/**
* @return The row.
*/
public byte [] getRow();
}

View File

@ -151,6 +151,8 @@ public class HbaseObjectWritable implements Writable, Configurable {
addToMap(QualifierFilter.class, code++);
addToMap(SkipFilter.class, code++);
addToMap(WritableByteArrayComparable.class, code++);
addToMap(Delete [].class, code++);
}
private Class<?> declaredClass;

View File

@ -108,8 +108,7 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion {
*/
public int put(final byte[] regionName, final Put [] puts)
throws IOException;
/**
* Deletes all the KeyValues that match those found in the Delete object,
* if their ts <= to the Delete. In case of a delete with a specific ts it
@ -120,7 +119,19 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion {
*/
public void delete(final byte[] regionName, final Delete delete)
throws IOException;
/**
* Put an array of deletes into the specified region
*
* @param regionName
* @param deletes
* @return The number of processed deletes. Returns -1 if all Deletes
* processed successfully.
* @throws IOException
*/
public int delete(final byte[] regionName, final Delete [] deletes)
throws IOException;
/**
* Atomically checks if a row/family/qualifier value match the expectedValue.
* If it does, it adds the put.

View File

@ -1798,7 +1798,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
throw convertThrowableToIOE(cleanup(t));
}
}
public int put(final byte[] regionName, final Put [] puts)
throws IOException {
// Count of Puts processed.
@ -1824,7 +1824,6 @@ public class HRegionServer implements HConstants, HRegionInterface,
// All have been processed successfully.
return -1;
}
/**
*
@ -1989,7 +1988,6 @@ public class HRegionServer implements HConstants, HRegionInterface,
//
// Methods that do the actual work for the remote API
//
public void delete(final byte [] regionName, final Delete delete)
throws IOException {
checkOpen();
@ -2006,7 +2004,33 @@ public class HRegionServer implements HConstants, HRegionInterface,
throw convertThrowableToIOE(cleanup(t));
}
}
public int delete(final byte[] regionName, final Delete [] deletes)
throws IOException {
// Count of Deletes processed.
int i = 0;
checkOpen();
try {
boolean writeToWAL = true;
this.cacheFlusher.reclaimMemStoreMemory();
Integer[] locks = new Integer[deletes.length];
HRegion region = getRegion(regionName);
for (i = 0; i < deletes.length; i++) {
this.requestCount.incrementAndGet();
locks[i] = getLockFromId(deletes[i].getLockId());
region.delete(deletes[i], locks[i], writeToWAL);
}
} catch (WrongRegionException ex) {
LOG.debug("Batch deletes: " + i, ex);
return i;
} catch (NotServingRegionException ex) {
return i;
} catch (Throwable t) {
throw convertThrowableToIOE(cleanup(t));
}
// All have been processed successfully.
return -1;
}
public long lockRow(byte [] regionName, byte [] row)
throws IOException {

View File

@ -21,6 +21,7 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.hbase.HBaseClusterTestCase;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -49,7 +50,7 @@ public class TestClient extends HBaseClusterTestCase {
super();
}
public void XtestSuperSimple() throws Exception {
public void testSuperSimple() throws Exception {
byte [] TABLE = Bytes.toBytes("testSuperSimple");
HTable ht = createTable(TABLE, FAMILY);
Put put = new Put(ROW);
@ -1071,7 +1072,6 @@ public class TestClient extends HBaseClusterTestCase {
}
public void testDeletes() throws Exception {
byte [] TABLE = Bytes.toBytes("testDeletes");
byte [][] ROWS = makeNAscii(ROW, 6);
@ -1318,8 +1318,38 @@ public class TestClient extends HBaseClusterTestCase {
assertTrue(Bytes.equals(result.sorted()[0].getValue(), VALUES[1]));
assertTrue(Bytes.equals(result.sorted()[1].getValue(), VALUES[2]));
scanner.close();
// Add test of bulk deleting.
for (int i = 0; i < 10; i++) {
byte [] bytes = Bytes.toBytes(i);
put = new Put(bytes);
put.add(FAMILIES[0], QUALIFIER, bytes);
ht.put(put);
}
for (int i = 0; i < 10; i++) {
byte [] bytes = Bytes.toBytes(i);
get = new Get(bytes);
get.addFamily(FAMILIES[0]);
result = ht.get(get);
assertTrue(result.size() == 1);
}
ArrayList<Delete> deletes = new ArrayList<Delete>();
for (int i = 0; i < 10; i++) {
byte [] bytes = Bytes.toBytes(i);
delete = new Delete(bytes);
delete.deleteFamily(FAMILIES[0]);
deletes.add(delete);
}
ht.delete(deletes);
for (int i = 0; i < 10; i++) {
byte [] bytes = Bytes.toBytes(i);
get = new Get(bytes);
get.addFamily(FAMILIES[0]);
result = ht.get(get);
assertTrue(result.size() == 0);
}
}
/**
* Baseline "scalability" test.
*