HBASE-1780 HTable.flushCommits clears write buffer in finally clause
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@808579 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
34dd385f1f
commit
7fe79fa71c
|
@ -357,6 +357,7 @@ Release 0.20.0 - Unreleased
|
||||||
new updates into the 'kvset' (active part)
|
new updates into the 'kvset' (active part)
|
||||||
HBASE-1767 test zookeeper broken in trunk and 0.20 branch; broken on
|
HBASE-1767 test zookeeper broken in trunk and 0.20 branch; broken on
|
||||||
hudson too
|
hudson too
|
||||||
|
HBASE-1780 HTable.flushCommits clears write buffer in finally clause
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HBASE-1089 Add count of regions on filesystem to master UI; add percentage
|
HBASE-1089 Add count of regions on filesystem to master UI; add percentage
|
||||||
|
|
|
@ -189,6 +189,6 @@ public interface HConnection {
|
||||||
* @param tableName The name of the table
|
* @param tableName The name of the table
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void processBatchOfRows(ArrayList<Put> list, byte[] tableName)
|
public int processBatchOfRows(ArrayList<Put> list, byte[] tableName)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
}
|
}
|
|
@ -997,10 +997,10 @@ public class HConnectionManager implements HConstants {
|
||||||
return location;
|
return location;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void processBatchOfRows(ArrayList<Put> list, byte[] tableName)
|
public int processBatchOfRows(ArrayList<Put> list, byte[] tableName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (list.isEmpty()) {
|
if (list.isEmpty()) {
|
||||||
return;
|
return 0;
|
||||||
}
|
}
|
||||||
boolean retryOnlyOne = false;
|
boolean retryOnlyOne = false;
|
||||||
if (list.size() > 1) {
|
if (list.size() > 1) {
|
||||||
|
@ -1014,7 +1014,8 @@ public class HConnectionManager implements HConstants {
|
||||||
byte [] region = currentRegion;
|
byte [] region = currentRegion;
|
||||||
boolean isLastRow = false;
|
boolean isLastRow = false;
|
||||||
Put [] putarray = new Put[0];
|
Put [] putarray = new Put[0];
|
||||||
for (int i = 0, tries = 0; i < list.size() && tries < this.numRetries; i++) {
|
int i, tries;
|
||||||
|
for (i = 0, tries = 0; i < list.size() && tries < this.numRetries; i++) {
|
||||||
Put put = list.get(i);
|
Put put = list.get(i);
|
||||||
currentPuts.add(put);
|
currentPuts.add(put);
|
||||||
// If the next Put goes to a new region, then we are to clear
|
// If the next Put goes to a new region, then we are to clear
|
||||||
|
@ -1072,6 +1073,7 @@ public class HConnectionManager implements HConstants {
|
||||||
currentPuts.clear();
|
currentPuts.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return i;
|
||||||
}
|
}
|
||||||
|
|
||||||
void close(boolean stopProxy) {
|
void close(boolean stopProxy) {
|
||||||
|
|
|
@ -54,14 +54,16 @@ import org.apache.hadoop.hbase.util.Writables;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to communicate with a single HBase table
|
* Used to communicate with a single HBase table.
|
||||||
|
* <p>
|
||||||
|
* This class is not MT safe for writes.
|
||||||
*/
|
*/
|
||||||
public class HTable implements HTableInterface {
|
public class HTable implements HTableInterface {
|
||||||
private final HConnection connection;
|
private final HConnection connection;
|
||||||
private final byte [] tableName;
|
private final byte [] tableName;
|
||||||
protected final int scannerTimeout;
|
protected final int scannerTimeout;
|
||||||
private volatile HBaseConfiguration configuration;
|
private volatile HBaseConfiguration configuration;
|
||||||
private ArrayList<Put> writeBuffer;
|
private final ArrayList<Put> writeBuffer = new ArrayList<Put>();
|
||||||
private long writeBufferSize;
|
private long writeBufferSize;
|
||||||
private boolean autoFlush;
|
private boolean autoFlush;
|
||||||
private long currentWriteBufferSize;
|
private long currentWriteBufferSize;
|
||||||
|
@ -121,7 +123,6 @@ public class HTable implements HTableInterface {
|
||||||
conf.getInt("hbase.regionserver.lease.period", 60 * 1000);
|
conf.getInt("hbase.regionserver.lease.period", 60 * 1000);
|
||||||
this.configuration = conf;
|
this.configuration = conf;
|
||||||
this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
|
this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
|
||||||
this.writeBuffer = new ArrayList<Put>();
|
|
||||||
this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152);
|
this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152);
|
||||||
this.autoFlush = true;
|
this.autoFlush = true;
|
||||||
this.currentWriteBufferSize = 0;
|
this.currentWriteBufferSize = 0;
|
||||||
|
@ -578,11 +579,15 @@ public class HTable implements HTableInterface {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void flushCommits() throws IOException {
|
public void flushCommits() throws IOException {
|
||||||
|
int last = 0;
|
||||||
try {
|
try {
|
||||||
connection.processBatchOfRows(writeBuffer, tableName);
|
last = connection.processBatchOfRows(writeBuffer, tableName);
|
||||||
} finally {
|
} finally {
|
||||||
|
writeBuffer.subList(0, last).clear();
|
||||||
currentWriteBufferSize = 0;
|
currentWriteBufferSize = 0;
|
||||||
writeBuffer.clear();
|
for (int i = 0; i < writeBuffer.size(); i++) {
|
||||||
|
currentWriteBufferSize += writeBuffer.get(i).heapSize();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue