HBASE-12490 Replace uses of setAutoFlush(boolean, boolean) (Solomon Duskis)

This commit is contained in:
stack 2014-12-02 10:06:21 -08:00
parent 1a9b556474
commit 7a3396f0e1
18 changed files with 32 additions and 37 deletions

View File

@ -116,18 +116,17 @@ public class HTable implements HTableInterface, RegionLocator {
private TableConfiguration tableConfiguration; private TableConfiguration tableConfiguration;
protected List<Row> writeAsyncBuffer = new LinkedList<Row>(); protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
private long writeBufferSize; private long writeBufferSize;
private boolean clearBufferOnFail; private boolean clearBufferOnFail = true;
private boolean autoFlush; private boolean autoFlush = true;
protected long currentWriteBufferSize; protected long currentWriteBufferSize = 0 ;
private boolean closed = false;
protected int scannerCaching; protected int scannerCaching;
private ExecutorService pool; // For Multi & Scan private ExecutorService pool; // For Multi & Scan
private boolean closed;
private int operationTimeout; private int operationTimeout;
private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final boolean cleanupPoolOnClose; // shutdown the pool in close()
private final boolean cleanupConnectionOnClose; // close the connection in close() private final boolean cleanupConnectionOnClose; // close the connection in close()
private Consistency defaultConsistency = Consistency.STRONG; private Consistency defaultConsistency = Consistency.STRONG;
/** The Async process for puts with autoflush set to false or multiputs */ /** The Async process for puts with autoflush set to false or multiputs */
protected AsyncProcess ap; protected AsyncProcess ap;
/** The Async process for batch */ /** The Async process for batch */
@ -326,9 +325,10 @@ public class HTable implements HTableInterface, RegionLocator {
/** /**
* For internal testing. * For internal testing.
* @throws IOException
*/ */
@VisibleForTesting @VisibleForTesting
protected HTable() { protected HTable() throws IOException {
tableName = null; tableName = null;
tableConfiguration = new TableConfiguration(); tableConfiguration = new TableConfiguration();
cleanupPoolOnClose = false; cleanupPoolOnClose = false;
@ -353,9 +353,6 @@ public class HTable implements HTableInterface, RegionLocator {
this.operationTimeout = tableName.isSystemTable() ? this.operationTimeout = tableName.isSystemTable() ?
tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout(); tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout();
this.writeBufferSize = tableConfiguration.getWriteBufferSize(); this.writeBufferSize = tableConfiguration.getWriteBufferSize();
this.clearBufferOnFail = true;
this.autoFlush = true;
this.currentWriteBufferSize = 0;
this.scannerCaching = tableConfiguration.getScannerCaching(); this.scannerCaching = tableConfiguration.getScannerCaching();
if (this.rpcCallerFactory == null) { if (this.rpcCallerFactory == null) {
@ -368,8 +365,6 @@ public class HTable implements HTableInterface, RegionLocator {
// puts need to track errors globally due to how the APIs currently work. // puts need to track errors globally due to how the APIs currently work.
ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory); ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory);
multiAp = this.connection.getAsyncProcess(); multiAp = this.connection.getAsyncProcess();
this.closed = false;
} }
/** /**

View File

@ -663,7 +663,7 @@ public class TestAsyncProcess {
HTable ht = new HTable(); HTable ht = new HTable();
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true); MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true);
ht.ap = ap; ht.ap = ap;
ht.setAutoFlush(true, true); ht.setAutoFlushTo(true);
if (bufferOn) { if (bufferOn) {
ht.setWriteBufferSize(1024L * 1024L); ht.setWriteBufferSize(1024L * 1024L);
} else { } else {
@ -711,7 +711,7 @@ public class TestAsyncProcess {
HTable ht = new HTable(); HTable ht = new HTable();
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true); MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true);
ht.ap = ap; ht.ap = ap;
ht.setAutoFlush(false, true); ht.setAutoFlushTo(false);
ht.setWriteBufferSize(0); ht.setWriteBufferSize(0);
Put p = createPut(1, false); Put p = createPut(1, false);
@ -739,7 +739,7 @@ public class TestAsyncProcess {
public void testWithNoClearOnFail() throws IOException { public void testWithNoClearOnFail() throws IOException {
HTable ht = new HTable(); HTable ht = new HTable();
ht.ap = new MyAsyncProcess(createHConnection(), conf, true); ht.ap = new MyAsyncProcess(createHConnection(), conf, true);
ht.setAutoFlush(false, false); ht.setAutoFlush(false);
Put p = createPut(1, false); Put p = createPut(1, false);
ht.put(p); ht.put(p);
@ -806,7 +806,7 @@ public class TestAsyncProcess {
ht.ap.serverTrackerTimeout = 1; ht.ap.serverTrackerTimeout = 1;
Put p = createPut(1, false); Put p = createPut(1, false);
ht.setAutoFlush(false, false); ht.setAutoFlush(false);
ht.put(p); ht.put(p);
try { try {
@ -828,7 +828,7 @@ public class TestAsyncProcess {
Assert.assertNotNull(ht.ap.createServerErrorTracker()); Assert.assertNotNull(ht.ap.createServerErrorTracker());
Put p = createPut(1, true); Put p = createPut(1, true);
ht.setAutoFlush(false, false); ht.setAutoFlush(false);
ht.put(p); ht.put(p);
try { try {

View File

@ -363,7 +363,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
protected void instantiateHTable(Configuration conf) throws IOException { protected void instantiateHTable(Configuration conf) throws IOException {
table = new HTable(conf, getTableName(conf)); table = new HTable(conf, getTableName(conf));
table.setAutoFlush(false, true); table.setAutoFlushTo(false);
table.setWriteBufferSize(4 * 1024 * 1024); table.setWriteBufferSize(4 * 1024 * 1024);
} }

View File

@ -185,7 +185,7 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
protected void instantiateHTable(Configuration conf) throws IOException { protected void instantiateHTable(Configuration conf) throws IOException {
for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
HTable table = new HTable(conf, getTableName(i)); HTable table = new HTable(conf, getTableName(i));
table.setAutoFlush(true, true); table.setAutoFlushTo(true);
//table.setWriteBufferSize(4 * 1024 * 1024); //table.setWriteBufferSize(4 * 1024 * 1024);
this.tables[i] = table; this.tables[i] = table;
} }

View File

@ -181,7 +181,7 @@ public void cleanUpCluster() throws Exception {
numBackReferencesPerRow = conf.getInt(NUM_BACKREFS_KEY, NUM_BACKREFS_DEFAULT); numBackReferencesPerRow = conf.getInt(NUM_BACKREFS_KEY, NUM_BACKREFS_DEFAULT);
table = new HTable(conf, TableName.valueOf(tableName)); table = new HTable(conf, TableName.valueOf(tableName));
table.setWriteBufferSize(4*1024*1024); table.setWriteBufferSize(4*1024*1024);
table.setAutoFlush(false, true); table.setAutoFlushTo(false);
String taskId = conf.get("mapreduce.task.attempt.id"); String taskId = conf.get("mapreduce.task.attempt.id");
Matcher matcher = Pattern.compile(".+_m_(\\d+_\\d+)").matcher(taskId); Matcher matcher = Pattern.compile(".+_m_(\\d+_\\d+)").matcher(taskId);

View File

@ -239,7 +239,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
for (int x = 0; x < 5000; x++) { for (int x = 0; x < 5000; x++) {
TraceScope traceScope = Trace.startSpan("insertData", Sampler.ALWAYS); TraceScope traceScope = Trace.startSpan("insertData", Sampler.ALWAYS);
try { try {
ht.setAutoFlush(false, true); ht.setAutoFlushTo(false);
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
long rk = random.nextLong(); long rk = random.nextLong();
rowKeys.add(rk); rowKeys.add(rk);

View File

@ -909,7 +909,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
void testSetup() throws IOException { void testSetup() throws IOException {
this.table = connection.getTable(tableName); this.table = connection.getTable(tableName);
this.table.setAutoFlush(false, true); this.table.setAutoFlushTo(false);
} }
void testTakedown() throws IOException { void testTakedown() throws IOException {

View File

@ -104,7 +104,7 @@ public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable,
if (!tables.containsKey(tableName)) { if (!tables.containsKey(tableName)) {
LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing"); LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing");
HTable table = new HTable(conf, TableName.valueOf(tableName.get())); HTable table = new HTable(conf, TableName.valueOf(tableName.get()));
table.setAutoFlush(false, true); table.setAutoFlushTo(false);
tables.put(tableName, table); tables.put(tableName, table);
} }
return tables.get(tableName); return tables.get(tableName);

View File

@ -194,7 +194,7 @@ implements Configurable {
} }
this.connection = ConnectionFactory.createConnection(this.conf); this.connection = ConnectionFactory.createConnection(this.conf);
this.table = connection.getTable(TableName.valueOf(tableName)); this.table = connection.getTable(TableName.valueOf(tableName));
((HTable) this.table).setAutoFlush(false, true); this.table.setAutoFlushTo(false);
LOG.info("Created table instance for " + tableName); LOG.info("Created table instance for " + tableName);
} catch(IOException e) { } catch(IOException e) {
LOG.error(e); LOG.error(e);

View File

@ -3907,7 +3907,7 @@ public class TestFromClientSide {
final int NB_BATCH_ROWS = 10; final int NB_BATCH_ROWS = 10;
HTable table = TEST_UTIL.createTable(Bytes.toBytes("testRowsPutBufferedOneFlush"), HTable table = TEST_UTIL.createTable(Bytes.toBytes("testRowsPutBufferedOneFlush"),
new byte [][] {CONTENTS_FAMILY, SMALL_FAMILY}); new byte [][] {CONTENTS_FAMILY, SMALL_FAMILY});
table.setAutoFlush(false, true); table.setAutoFlushTo(false);
ArrayList<Put> rowsUpdate = new ArrayList<Put>(); ArrayList<Put> rowsUpdate = new ArrayList<Put>();
for (int i = 0; i < NB_BATCH_ROWS * 10; i++) { for (int i = 0; i < NB_BATCH_ROWS * 10; i++) {
byte[] row = Bytes.toBytes("row" + i); byte[] row = Bytes.toBytes("row" + i);
@ -3948,7 +3948,7 @@ public class TestFromClientSide {
final int NB_BATCH_ROWS = 10; final int NB_BATCH_ROWS = 10;
HTable table = TEST_UTIL.createTable(Bytes.toBytes("testRowsPutBufferedManyManyFlushes"), HTable table = TEST_UTIL.createTable(Bytes.toBytes("testRowsPutBufferedManyManyFlushes"),
new byte[][] {CONTENTS_FAMILY, SMALL_FAMILY }); new byte[][] {CONTENTS_FAMILY, SMALL_FAMILY });
table.setAutoFlush(false, true); table.setAutoFlushTo(false);
table.setWriteBufferSize(10); table.setWriteBufferSize(10);
ArrayList<Put> rowsUpdate = new ArrayList<Put>(); ArrayList<Put> rowsUpdate = new ArrayList<Put>();
for (int i = 0; i < NB_BATCH_ROWS * 10; i++) { for (int i = 0; i < NB_BATCH_ROWS * 10; i++) {
@ -4277,7 +4277,7 @@ public class TestFromClientSide {
new byte[][] { HConstants.CATALOG_FAMILY, Bytes.toBytes("info2") }, 1, 1024); new byte[][] { HConstants.CATALOG_FAMILY, Bytes.toBytes("info2") }, 1, 1024);
// set block size to 64 to making 2 kvs into one block, bypassing the walkForwardInSingleRow // set block size to 64 to making 2 kvs into one block, bypassing the walkForwardInSingleRow
// in Store.rowAtOrBeforeFromStoreFile // in Store.rowAtOrBeforeFromStoreFile
table.setAutoFlush(true); table.setAutoFlushTo(true);
String regionName = table.getRegionLocations().firstKey().getEncodedName(); String regionName = table.getRegionLocations().firstKey().getEncodedName();
HRegion region = HRegion region =
TEST_UTIL.getRSForFirstRegionInTable(tableAname).getFromOnlineRegions(regionName); TEST_UTIL.getRSForFirstRegionInTable(tableAname).getFromOnlineRegions(regionName);

View File

@ -263,7 +263,7 @@ public class TestMultiParallel {
// Load the data // Load the data
LOG.info("get new table"); LOG.info("get new table");
HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
table.setAutoFlush(false, true); table.setAutoFlushTo(false);
table.setWriteBufferSize(10 * 1024 * 1024); table.setWriteBufferSize(10 * 1024 * 1024);
LOG.info("constructPutRequests"); LOG.info("constructPutRequests");

View File

@ -178,7 +178,7 @@ public class TestHTableWrapper {
boolean initialAutoFlush = hTableInterface.isAutoFlush(); boolean initialAutoFlush = hTableInterface.isAutoFlush();
hTableInterface.setAutoFlushTo(false); hTableInterface.setAutoFlushTo(false);
assertFalse(hTableInterface.isAutoFlush()); assertFalse(hTableInterface.isAutoFlush());
hTableInterface.setAutoFlush(true, true); hTableInterface.setAutoFlushTo(true);
assertTrue(hTableInterface.isAutoFlush()); assertTrue(hTableInterface.isAutoFlush());
hTableInterface.setAutoFlushTo(initialAutoFlush); hTableInterface.setAutoFlushTo(initialAutoFlush);
} }

View File

@ -921,7 +921,7 @@ public class TestDistributedLogSplitting {
if (key == null || key.length == 0) { if (key == null || key.length == 0) {
key = new byte[] { 0, 0, 0, 0, 1 }; key = new byte[] { 0, 0, 0, 0, 1 };
} }
ht.setAutoFlush(true, true); ht.setAutoFlushTo(true);
Put put = new Put(key); Put put = new Put(key);
put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'}); put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'});
ht.put(put); ht.put(put);
@ -1612,7 +1612,7 @@ public class TestDistributedLogSplitting {
* Load table with puts and deletes with expected values so that we can verify later * Load table with puts and deletes with expected values so that we can verify later
*/ */
private void prepareData(final HTable t, final byte[] f, final byte[] column) throws IOException { private void prepareData(final HTable t, final byte[] f, final byte[] column) throws IOException {
t.setAutoFlush(false, true); t.setAutoFlushTo(false);
byte[] k = new byte[3]; byte[] k = new byte[3];
// add puts // add puts

View File

@ -348,7 +348,7 @@ public class TestRegionServerMetrics {
TEST_UTIL.createTable(tableName, cf); TEST_UTIL.createTable(tableName, cf);
HTable t = new HTable(conf, tableName); HTable t = new HTable(conf, tableName);
t.setAutoFlush(false, true); t.setAutoFlushTo(false);
for (int insertCount =0; insertCount < 100; insertCount++) { for (int insertCount =0; insertCount < 100; insertCount++) {
Put p = new Put(Bytes.toBytes("" + insertCount + "row")); Put p = new Put(Bytes.toBytes("" + insertCount + "row"));
p.add(cf, qualifier, val); p.add(cf, qualifier, val);

View File

@ -456,7 +456,7 @@ public class TestLogRolling {
writeData(table, 1002); writeData(table, 1002);
table.setAutoFlush(true, true); table.setAutoFlushTo(true);
long curTime = System.currentTimeMillis(); long curTime = System.currentTimeMillis();
LOG.info("log.getCurrentFileName()): " + DefaultWALProvider.getCurrentFileName(log)); LOG.info("log.getCurrentFileName()): " + DefaultWALProvider.getCurrentFileName(log));

View File

@ -54,7 +54,7 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
*/ */
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
((HTable)htable1).setAutoFlush(false, true); htable1.setAutoFlushTo(false);
// Starting and stopping replication can make us miss new logs, // Starting and stopping replication can make us miss new logs,
// rolling like this makes sure the most recent one gets added to the queue // rolling like this makes sure the most recent one gets added to the queue
for (JVMClusterUtil.RegionServerThread r : for (JVMClusterUtil.RegionServerThread r :

View File

@ -69,7 +69,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
*/ */
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
((HTable)htable1).setAutoFlush(true, true); htable1.setAutoFlushTo(true);
// Starting and stopping replication can make us miss new logs, // Starting and stopping replication can make us miss new logs,
// rolling like this makes sure the most recent one gets added to the queue // rolling like this makes sure the most recent one gets added to the queue
for ( JVMClusterUtil.RegionServerThread r : for ( JVMClusterUtil.RegionServerThread r :
@ -247,7 +247,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
LOG.info("testSmallBatch"); LOG.info("testSmallBatch");
Put put; Put put;
// normal Batch tests // normal Batch tests
((HTable)htable1).setAutoFlush(false, true); htable1.setAutoFlushTo(false);
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
put = new Put(Bytes.toBytes(i)); put = new Put(Bytes.toBytes(i));
put.add(famName, row, row); put.add(famName, row, row);
@ -387,7 +387,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
public void testLoading() throws Exception { public void testLoading() throws Exception {
LOG.info("Writing out rows to table1 in testLoading"); LOG.info("Writing out rows to table1 in testLoading");
htable1.setWriteBufferSize(1024); htable1.setWriteBufferSize(1024);
((HTable)htable1).setAutoFlush(false, true); ((HTable)htable1).setAutoFlushTo(false);
for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) { for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) {
Put put = new Put(Bytes.toBytes(i)); Put put = new Put(Bytes.toBytes(i));
put.add(famName, row, row); put.add(famName, row, row);

View File

@ -678,7 +678,7 @@ public class SnapshotTestingUtils {
public static void loadData(final HBaseTestingUtility util, final HTable table, int rows, public static void loadData(final HBaseTestingUtility util, final HTable table, int rows,
byte[]... families) throws IOException, InterruptedException { byte[]... families) throws IOException, InterruptedException {
table.setAutoFlush(false, true); table.setAutoFlushTo(false);
// Ensure one row per region // Ensure one row per region
assertTrue(rows >= KEYS.length); assertTrue(rows >= KEYS.length);