HBASE-963 Fix the retries in HTable.flushCommit
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@709320 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8cfa1ce45b
commit
e352c5dfe1
|
@ -55,6 +55,7 @@ Release 0.19.0 - Unreleased
|
|||
HBASE-971 Fix the failing tests on Hudson
|
||||
HBASE-973 [doc] In getting started, make it clear that hbase needs to
|
||||
create its directory in hdfs
|
||||
HBASE-963 Fix the retries in HTable.flushCommit
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-901 Add a limit to key length, check key and value length on client side
|
||||
|
|
|
@ -20,11 +20,13 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.hbase.ipc.HMasterInterface;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
|
||||
|
@ -150,4 +152,15 @@ public interface HConnection {
|
|||
*/
|
||||
public <T> T getRegionServerForWithoutRetries(ServerCallable<T> callable)
|
||||
throws IOException, RuntimeException;
|
||||
|
||||
|
||||
/**
|
||||
* Process a batch of rows. Currently it only works for updates until
|
||||
* HBASE-880 is available. Does the retries.
|
||||
* @param list A batch of rows to process
|
||||
* @param tableName The name of the table
|
||||
* @throws IOException
|
||||
*/
|
||||
public void processBatchOfRows(ArrayList<BatchUpdate> list, byte[] tableName)
|
||||
throws IOException;
|
||||
}
|
|
@ -934,7 +934,7 @@ public class HConnectionManager implements HConstants {
|
|||
return i;
|
||||
}
|
||||
});
|
||||
if (index != updates.length - 1) {
|
||||
if (index != -1) {
|
||||
if (tries == numRetries - 1) {
|
||||
throw new RetriesExhaustedException("Some server",
|
||||
currentRegion, batchUpdate.getRow(),
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.client;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
@ -1029,60 +1028,20 @@ public class HTable {
|
|||
*/
|
||||
public void flushCommits() throws IOException {
|
||||
try {
|
||||
// See HBASE-748 for pseudo code of this method
|
||||
if (writeBuffer.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
Collections.sort(writeBuffer);
|
||||
List<BatchUpdate> tempUpdates = new ArrayList<BatchUpdate>();
|
||||
byte[] currentRegion = connection.getRegionLocation(tableName,
|
||||
writeBuffer.get(0).getRow(), false).getRegionInfo().getRegionName();
|
||||
byte[] region = currentRegion;
|
||||
boolean isLastRow = false;
|
||||
for (int i = 0; i < writeBuffer.size(); i++) {
|
||||
BatchUpdate batchUpdate = writeBuffer.get(i);
|
||||
tempUpdates.add(batchUpdate);
|
||||
isLastRow = (i + 1) == writeBuffer.size();
|
||||
if (!isLastRow) {
|
||||
region = connection.getRegionLocation(tableName,
|
||||
writeBuffer.get(i + 1).getRow(), false).getRegionInfo()
|
||||
.getRegionName();
|
||||
}
|
||||
if (!Bytes.equals(currentRegion, region) || isLastRow) {
|
||||
final BatchUpdate[] updates = tempUpdates.toArray(new BatchUpdate[0]);
|
||||
int index = connection
|
||||
.getRegionServerForWithoutRetries(new ServerCallable<Integer>(
|
||||
connection, tableName, batchUpdate.getRow()) {
|
||||
public Integer call() throws IOException {
|
||||
int i = server.batchUpdates(location.getRegionInfo()
|
||||
.getRegionName(), updates);
|
||||
return i;
|
||||
}
|
||||
});
|
||||
if (index != -1) {
|
||||
// Basic waiting time. If many updates are flushed, tests have shown
|
||||
// that this is barely needed but when commiting 1 update this may
|
||||
// get retried hundreds of times.
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
i = i - updates.length + index;
|
||||
region = connection.getRegionLocation(tableName,
|
||||
writeBuffer.get(i + 1).getRow(), true).getRegionInfo()
|
||||
.getRegionName();
|
||||
|
||||
}
|
||||
currentRegion = region;
|
||||
tempUpdates.clear();
|
||||
}
|
||||
}
|
||||
connection.processBatchOfRows(writeBuffer, tableName);
|
||||
} finally {
|
||||
currentWriteBufferSize = 0;
|
||||
writeBuffer.clear();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Release held resources
|
||||
*
|
||||
*/
|
||||
public void close() throws IOException{
|
||||
flushCommits();
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method that checks rows existence, length and columns well
|
||||
|
|
Loading…
Reference in New Issue