HBASE-9521 clean clearBufferOnFail behavior and deprecate it
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1523782 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
74da3e4480
commit
027f7a708f
|
@ -1361,42 +1361,26 @@ public class HTable implements HTableInterface {
|
|||
}
|
||||
|
||||
/**
|
||||
* See {@link #setAutoFlush(boolean, boolean)}
|
||||
*
|
||||
* @param autoFlush
|
||||
* Whether or not to enable 'auto-flush'.
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Deprecated
|
||||
@Override
|
||||
public void setAutoFlush(boolean autoFlush) {
|
||||
setAutoFlush(autoFlush, autoFlush);
|
||||
}
|
||||
|
||||
/**
|
||||
* Turns 'auto-flush' on or off.
|
||||
* <p>
|
||||
* When enabled (default), {@link Put} operations don't get buffered/delayed
|
||||
* and are immediately executed. Failed operations are not retried. This is
|
||||
* slower but safer.
|
||||
* <p>
|
||||
* Turning off {@link #autoFlush} means that multiple {@link Put}s will be
|
||||
* accepted before any RPC is actually sent to do the write operations. If the
|
||||
* application dies before pending writes get flushed to HBase, data will be
|
||||
* lost.
|
||||
* <p>
|
||||
* When you turn {@link #autoFlush} off, you should also consider the
|
||||
* {@link #clearBufferOnFail} option. By default, asynchronous {@link Put}
|
||||
* requests will be retried on failure until successful. However, this can
|
||||
* pollute the writeBuffer and slow down batching performance. Additionally,
|
||||
* you may want to issue a number of Put requests and call
|
||||
* {@link #flushCommits()} as a barrier. In both use cases, consider setting
|
||||
* clearBufferOnFail to true to erase the buffer after {@link #flushCommits()}
|
||||
* has been called, regardless of success.
|
||||
*
|
||||
* @param autoFlush
|
||||
* Whether or not to enable 'auto-flush'.
|
||||
* @param clearBufferOnFail
|
||||
* Whether to keep Put failures in the writeBuffer
|
||||
* @see #flushCommits
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void setAutoFlushTo(boolean autoFlush) {
|
||||
setAutoFlush(autoFlush, clearBufferOnFail);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
|
||||
this.autoFlush = autoFlush;
|
||||
this.clearBufferOnFail = autoFlush || clearBufferOnFail;
|
||||
|
@ -1409,6 +1393,7 @@ public class HTable implements HTableInterface {
|
|||
* {@code hbase.client.write.buffer}.
|
||||
* @return The size of the write buffer in bytes.
|
||||
*/
|
||||
@Override
|
||||
public long getWriteBufferSize() {
|
||||
return writeBufferSize;
|
||||
}
|
||||
|
|
|
@ -511,8 +511,13 @@ public interface HTableInterface extends Closeable {
|
|||
* See {@link #setAutoFlush(boolean, boolean)}
|
||||
*
|
||||
* @param autoFlush
|
||||
* Whether or not to enable 'auto-flush'.
|
||||
* Whether or not to enable 'auto-flush'.
|
||||
* @deprecated in 0.96. When called with setAutoFlush(false), this function also
|
||||
* set clearBufferOnFail to true, which is unexpected but kept for historical reasons.
|
||||
* Replace it with setAutoFlush(false, false) if this is exactly what you want, or by
|
||||
* {@link #setAutoFlushTo(boolean)} for all other cases.
|
||||
*/
|
||||
@Deprecated
|
||||
void setAutoFlush(boolean autoFlush);
|
||||
|
||||
/**
|
||||
|
@ -522,28 +527,39 @@ public interface HTableInterface extends Closeable {
|
|||
* and are immediately executed. Failed operations are not retried. This is
|
||||
* slower but safer.
|
||||
* <p>
|
||||
* Turning off {@code autoFlush} means that multiple {@link Put}s will be
|
||||
* Turning off {@code #autoFlush} means that multiple {@link Put}s will be
|
||||
* accepted before any RPC is actually sent to do the write operations. If the
|
||||
* application dies before pending writes get flushed to HBase, data will be
|
||||
* lost.
|
||||
* <p>
|
||||
* When you turn {@code #autoFlush} off, you should also consider the
|
||||
* {@code clearBufferOnFail} option. By default, asynchronous {@link Put}
|
||||
* {@code #clearBufferOnFail} option. By default, asynchronous {@link Put}
|
||||
* requests will be retried on failure until successful. However, this can
|
||||
* pollute the writeBuffer and slow down batching performance. Additionally,
|
||||
* you may want to issue a number of Put requests and call
|
||||
* {@link #flushCommits()} as a barrier. In both use cases, consider setting
|
||||
* clearBufferOnFail to true to erase the buffer after {@link #flushCommits()}
|
||||
* has been called, regardless of success.
|
||||
* <p>
|
||||
* In other words, if you call {@code #setAutoFlush(false)}; HBase will retry N time for each
|
||||
* flushCommit, including the last one when closing the table. This is NOT recommended,
|
||||
* most of the time you want to call {@code #setAutoFlush(false, true)}.
|
||||
*
|
||||
* @param autoFlush
|
||||
* Whether or not to enable 'auto-flush'.
|
||||
* Whether or not to enable 'auto-flush'.
|
||||
* @param clearBufferOnFail
|
||||
* Whether to keep Put failures in the writeBuffer
|
||||
* Whether to keep Put failures in the writeBuffer. If autoFlush is true, then
|
||||
* the value of this parameter is ignored and clearBufferOnFail is set to true.
|
||||
* Setting clearBufferOnFail to false is deprecated since 0.96.
|
||||
* @see #flushCommits
|
||||
*/
|
||||
void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail);
|
||||
|
||||
/**
|
||||
* Set the autoFlush behavior, without changing the value of {@code clearBufferOnFail}
|
||||
*/
|
||||
void setAutoFlushTo(boolean autoFlush);
|
||||
|
||||
/**
|
||||
* Returns the maximum size in bytes of the write buffer for this HTable.
|
||||
* <p>
|
||||
|
|
|
@ -569,7 +569,7 @@ public class HTablePool implements Closeable {
|
|||
@Override
|
||||
public void setAutoFlush(boolean autoFlush) {
|
||||
checkState();
|
||||
table.setAutoFlush(autoFlush);
|
||||
table.setAutoFlush(autoFlush, autoFlush);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -578,6 +578,11 @@ public class HTablePool implements Closeable {
|
|||
table.setAutoFlush(autoFlush, clearBufferOnFail);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAutoFlushTo(boolean autoFlush) {
|
||||
table.setAutoFlushTo(autoFlush);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getWriteBufferSize() {
|
||||
checkState();
|
||||
|
|
|
@ -600,7 +600,7 @@ public class TestAsyncProcess {
|
|||
|
||||
|
||||
Put p = createPut(true, false);
|
||||
ht.setAutoFlush(false);
|
||||
ht.setAutoFlush(false, false);
|
||||
ht.put(p);
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
|
|
|
@ -335,7 +335,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
id = Bytes.toBytes(UUID.randomUUID().toString());
|
||||
Configuration conf = context.getConfiguration();
|
||||
table = new HTable(conf, getTableName(conf));
|
||||
table.setAutoFlush(false);
|
||||
table.setAutoFlush(false, true);
|
||||
table.setWriteBufferSize(4 * 1024 * 1024);
|
||||
this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
|
||||
current = new byte[this.width][];
|
||||
|
|
|
@ -177,7 +177,7 @@ public class IntegrationTestLoadAndVerify extends IntegrationTestBase {
|
|||
numBackReferencesPerRow = conf.getInt(NUM_BACKREFS_KEY, NUM_BACKREFS_DEFAULT);
|
||||
table = new HTable(conf, tableName);
|
||||
table.setWriteBufferSize(4*1024*1024);
|
||||
table.setAutoFlush(false);
|
||||
table.setAutoFlush(false, true);
|
||||
|
||||
String taskId = conf.get("mapred.task.id");
|
||||
Matcher matcher = Pattern.compile(".+_m_(\\d+_\\d+)").matcher(taskId);
|
||||
|
|
|
@ -241,7 +241,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
|
|||
for (int x = 0; x < 5000; x++) {
|
||||
TraceScope traceScope = Trace.startSpan("insertData", Sampler.ALWAYS);
|
||||
try {
|
||||
ht.setAutoFlush(false);
|
||||
ht.setAutoFlush(false, true);
|
||||
for (int i = 0; i < 5; i++) {
|
||||
long rk = random.nextLong();
|
||||
rowKeys.add(rk);
|
||||
|
|
|
@ -547,7 +547,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
|
||||
@Override
|
||||
public void setAutoFlush(boolean autoFlush) {
|
||||
table.setAutoFlush(autoFlush);
|
||||
table.setAutoFlush(autoFlush, autoFlush);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -555,6 +555,11 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
table.setAutoFlush(autoFlush, clearBufferOnFail);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAutoFlushTo(boolean autoFlush) {
|
||||
table.setAutoFlushTo(autoFlush);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getWriteBufferSize() {
|
||||
return table.getWriteBufferSize();
|
||||
|
|
|
@ -89,7 +89,7 @@ FileOutputFormat<ImmutableBytesWritable, Put> {
|
|||
LOG.error(e);
|
||||
throw e;
|
||||
}
|
||||
table.setAutoFlush(false);
|
||||
table.setAutoFlush(false, true);
|
||||
return new TableRecordWriter(table);
|
||||
}
|
||||
|
||||
|
|
|
@ -103,7 +103,7 @@ public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable,
|
|||
if (!tables.containsKey(tableName)) {
|
||||
LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing");
|
||||
HTable table = new HTable(conf, tableName.get());
|
||||
table.setAutoFlush(false);
|
||||
table.setAutoFlush(false, true);
|
||||
tables.put(tableName, table);
|
||||
}
|
||||
return tables.get(tableName);
|
||||
|
|
|
@ -204,7 +204,7 @@ implements Configurable {
|
|||
this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
|
||||
}
|
||||
this.table = new HTable(this.conf, tableName);
|
||||
this.table.setAutoFlush(false);
|
||||
this.table.setAutoFlush(false, true);
|
||||
LOG.info("Created table instance for " + tableName);
|
||||
} catch(IOException e) {
|
||||
LOG.error(e);
|
||||
|
|
|
@ -770,6 +770,11 @@ public class RemoteHTable implements HTableInterface {
|
|||
throw new UnsupportedOperationException("setAutoFlush not implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAutoFlushTo(boolean autoFlush) {
|
||||
throw new UnsupportedOperationException("setAutoFlushTo not implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getWriteBufferSize() {
|
||||
throw new UnsupportedOperationException("getWriteBufferSize not implemented");
|
||||
|
|
|
@ -1580,7 +1580,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
* @throws IOException
|
||||
*/
|
||||
public int loadTable(final HTable t, final byte[] f) throws IOException {
|
||||
t.setAutoFlush(false);
|
||||
t.setAutoFlush(false, true);
|
||||
byte[] k = new byte[3];
|
||||
int rowCount = 0;
|
||||
for (byte b1 = 'a'; b1 <= 'z'; b1++) {
|
||||
|
@ -1608,7 +1608,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
* @throws IOException
|
||||
*/
|
||||
public int loadTable(final HTable t, final byte[][] f) throws IOException {
|
||||
t.setAutoFlush(false);
|
||||
t.setAutoFlush(false, true);
|
||||
byte[] k = new byte[3];
|
||||
int rowCount = 0;
|
||||
for (byte b1 = 'a'; b1 <= 'z'; b1++) {
|
||||
|
|
|
@ -858,7 +858,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
|
||||
void testSetup() throws IOException {
|
||||
this.table = connection.getTable(tableName);
|
||||
this.table.setAutoFlush(false);
|
||||
this.table.setAutoFlush(false, true);
|
||||
}
|
||||
|
||||
void testTakedown() throws IOException {
|
||||
|
|
|
@ -3874,7 +3874,7 @@ public class TestFromClientSide {
|
|||
final int NB_BATCH_ROWS = 10;
|
||||
HTable table = TEST_UTIL.createTable(Bytes.toBytes("testRowsPutBufferedOneFlush"),
|
||||
new byte [][] {CONTENTS_FAMILY, SMALL_FAMILY});
|
||||
table.setAutoFlush(false);
|
||||
table.setAutoFlush(false, true);
|
||||
ArrayList<Put> rowsUpdate = new ArrayList<Put>();
|
||||
for (int i = 0; i < NB_BATCH_ROWS * 10; i++) {
|
||||
byte[] row = Bytes.toBytes("row" + i);
|
||||
|
@ -3915,7 +3915,7 @@ public class TestFromClientSide {
|
|||
final int NB_BATCH_ROWS = 10;
|
||||
HTable table = TEST_UTIL.createTable(Bytes.toBytes("testRowsPutBufferedManyManyFlushes"),
|
||||
new byte[][] {CONTENTS_FAMILY, SMALL_FAMILY });
|
||||
table.setAutoFlush(false);
|
||||
table.setAutoFlush(false, true);
|
||||
table.setWriteBufferSize(10);
|
||||
ArrayList<Put> rowsUpdate = new ArrayList<Put>();
|
||||
for (int i = 0; i < NB_BATCH_ROWS * 10; i++) {
|
||||
|
@ -4576,7 +4576,6 @@ public class TestFromClientSide {
|
|||
|
||||
HTable table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY },
|
||||
conf, Integer.MAX_VALUE);
|
||||
table.setAutoFlush(true);
|
||||
|
||||
final long ts = EnvironmentEdgeManager.currentTimeMillis();
|
||||
Get get = new Get(ROW);
|
||||
|
@ -4614,7 +4613,6 @@ public class TestFromClientSide {
|
|||
|
||||
final HTable table = TEST_UTIL.createTable(tableName,
|
||||
new byte[][] { FAMILY }, conf, 3);
|
||||
table.setAutoFlush(true);
|
||||
|
||||
final long ts = EnvironmentEdgeManager.currentTimeMillis();
|
||||
final Get get = new Get(ROW);
|
||||
|
|
|
@ -69,7 +69,7 @@ public class TestHTableUtil {
|
|||
public void testBucketPut() throws Exception {
|
||||
byte [] TABLE = Bytes.toBytes("testBucketPut");
|
||||
HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
|
||||
ht.setAutoFlush( false );
|
||||
ht.setAutoFlush(false, true);
|
||||
|
||||
List<Put> puts = new ArrayList<Put>();
|
||||
puts.add( createPut("row1") );
|
||||
|
|
|
@ -252,7 +252,7 @@ public class TestMultiParallel {
|
|||
// Load the data
|
||||
LOG.info("get new table");
|
||||
HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
|
||||
table.setAutoFlush(false);
|
||||
table.setAutoFlush(false, true);
|
||||
table.setWriteBufferSize(10 * 1024 * 1024);
|
||||
|
||||
LOG.info("constructPutRequests");
|
||||
|
|
|
@ -79,7 +79,6 @@ public class TestConstraint {
|
|||
|
||||
util.getHBaseAdmin().createTable(desc);
|
||||
HTable table = new HTable(util.getConfiguration(), tableName);
|
||||
table.setAutoFlush(true);
|
||||
|
||||
// test that we don't fail on a valid put
|
||||
Put put = new Put(row1);
|
||||
|
@ -110,7 +109,6 @@ public class TestConstraint {
|
|||
|
||||
util.getHBaseAdmin().createTable(desc);
|
||||
HTable table = new HTable(util.getConfiguration(), tableName);
|
||||
table.setAutoFlush(true);
|
||||
|
||||
// test that we do fail on violation
|
||||
Put put = new Put(row1);
|
||||
|
@ -154,7 +152,6 @@ public class TestConstraint {
|
|||
|
||||
util.getHBaseAdmin().createTable(desc);
|
||||
HTable table = new HTable(util.getConfiguration(), tableName);
|
||||
table.setAutoFlush(true);
|
||||
|
||||
// test that we don't fail because its disabled
|
||||
Put put = new Put(row1);
|
||||
|
@ -185,7 +182,6 @@ public class TestConstraint {
|
|||
|
||||
util.getHBaseAdmin().createTable(desc);
|
||||
HTable table = new HTable(util.getConfiguration(), tableName);
|
||||
table.setAutoFlush(true);
|
||||
|
||||
// test that we do fail on violation
|
||||
Put put = new Put(row1);
|
||||
|
@ -216,7 +212,6 @@ public class TestConstraint {
|
|||
|
||||
util.getHBaseAdmin().createTable(desc);
|
||||
HTable table = new HTable(util.getConfiguration(), tableName);
|
||||
table.setAutoFlush(true);
|
||||
|
||||
// test that we do fail on violation
|
||||
Put put = new Put(row1);
|
||||
|
|
|
@ -817,7 +817,7 @@ public class TestDistributedLogSplitting {
|
|||
if (key == null || key.length == 0) {
|
||||
key = new byte[] { 0, 0, 0, 0, 1 };
|
||||
}
|
||||
ht.setAutoFlush(true);
|
||||
ht.setAutoFlush(true, true);
|
||||
Put put = new Put(key);
|
||||
put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'});
|
||||
ht.put(put);
|
||||
|
@ -1229,7 +1229,7 @@ public class TestDistributedLogSplitting {
|
|||
* Load table with puts and deletes with expected values so that we can verify later
|
||||
*/
|
||||
private void prepareData(final HTable t, final byte[] f, final byte[] column) throws IOException {
|
||||
t.setAutoFlush(false);
|
||||
t.setAutoFlush(false, true);
|
||||
byte[] k = new byte[3];
|
||||
|
||||
// add puts
|
||||
|
|
|
@ -324,7 +324,7 @@ public class TestRegionServerMetrics {
|
|||
|
||||
TEST_UTIL.createTable(tableName, cf);
|
||||
HTable t = new HTable(conf, tableName);
|
||||
t.setAutoFlush(false);
|
||||
t.setAutoFlush(false, true);
|
||||
for (int insertCount =0; insertCount < 100; insertCount++) {
|
||||
Put p = new Put(Bytes.toBytes("" + insertCount + "row"));
|
||||
p.add(cf, qualifier, val);
|
||||
|
|
|
@ -478,7 +478,7 @@ public class TestLogRolling {
|
|||
|
||||
writeData(table, 1002);
|
||||
|
||||
table.setAutoFlush(true);
|
||||
table.setAutoFlush(true, true);
|
||||
|
||||
long curTime = System.currentTimeMillis();
|
||||
long oldFilenum = log.getFilenum();
|
||||
|
|
|
@ -54,7 +54,7 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
|
|||
*/
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
htable1.setAutoFlush(true);
|
||||
htable1.setAutoFlush(false, true);
|
||||
// Starting and stopping replication can make us miss new logs,
|
||||
// rolling like this makes sure the most recent one gets added to the queue
|
||||
for (JVMClusterUtil.RegionServerThread r :
|
||||
|
|
|
@ -67,7 +67,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
|
|||
*/
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
htable1.setAutoFlush(true);
|
||||
htable1.setAutoFlush(true, true);
|
||||
// Starting and stopping replication can make us miss new logs,
|
||||
// rolling like this makes sure the most recent one gets added to the queue
|
||||
for ( JVMClusterUtil.RegionServerThread r :
|
||||
|
@ -245,7 +245,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
|
|||
LOG.info("testSmallBatch");
|
||||
Put put;
|
||||
// normal Batch tests
|
||||
htable1.setAutoFlush(false);
|
||||
htable1.setAutoFlush(false, true);
|
||||
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
|
||||
put = new Put(Bytes.toBytes(i));
|
||||
put.add(famName, row, row);
|
||||
|
@ -384,7 +384,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
|
|||
@Test(timeout=300000)
|
||||
public void loadTesting() throws Exception {
|
||||
htable1.setWriteBufferSize(1024);
|
||||
htable1.setAutoFlush(false);
|
||||
htable1.setAutoFlush(false, true);
|
||||
for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) {
|
||||
Put put = new Put(Bytes.toBytes(i));
|
||||
put.add(famName, row, row);
|
||||
|
|
|
@ -503,7 +503,7 @@ public class SnapshotTestingUtils {
|
|||
|
||||
public static void loadData(final HBaseTestingUtility util, final HTable table, int rows,
|
||||
byte[]... families) throws IOException, InterruptedException {
|
||||
table.setAutoFlush(false);
|
||||
table.setAutoFlush(false, true);
|
||||
|
||||
// Ensure one row per region
|
||||
assertTrue(rows >= 16);
|
||||
|
|
Loading…
Reference in New Issue