HBASE-12086 Fix bug of HTableMultipliexer

Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
David Deng 2014-09-23 22:46:03 -07:00 committed by Elliott Clark
parent 31ed817447
commit 78d532e5f3
4 changed files with 170 additions and 144 deletions

View File

@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
@ -53,6 +52,7 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.htrace.Trace; import org.htrace.Trace;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
/** /**
@ -288,6 +288,10 @@ class AsyncProcess {
this.rpcFactory = rpcFactory; this.rpcFactory = rpcFactory;
} }
/**
* @return pool if non null, otherwise returns this.pool if non null, otherwise throws
* RuntimeException
*/
private ExecutorService getPool(ExecutorService pool) { private ExecutorService getPool(ExecutorService pool) {
if (pool != null) return pool; if (pool != null) return pool;
if (this.pool != null) return this.pool; if (this.pool != null) return this.pool;
@ -352,8 +356,8 @@ class AsyncProcess {
RegionLocations locs = hConnection.locateRegion( RegionLocations locs = hConnection.locateRegion(
tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID); tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) { if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
throw new IOException("#" + id + ", no location found, aborting submit for" + throw new IOException("#" + id + ", no location found, aborting submit for"
" tableName=" + tableName + " rowkey=" + Arrays.toString(r.getRow())); + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
} }
loc = locs.getDefaultRegionLocation(); loc = locs.getDefaultRegionLocation();
} catch (IOException ex) { } catch (IOException ex) {
@ -383,15 +387,24 @@ class AsyncProcess {
if (retainedActions.isEmpty()) return NO_REQS_RESULT; if (retainedActions.isEmpty()) return NO_REQS_RESULT;
return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
locationErrors, locationErrorRows, actionsByServer, pool);
}
<CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
Object[] results, boolean needResults, List<Exception> locationErrors,
List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
ExecutorService pool) {
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture( AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
tableName, retainedActions, nonceGroup, pool, callback, null, needResults); tableName, retainedActions, nonceGroup, pool, callback, results, needResults);
// Add location errors if any // Add location errors if any
if (locationErrors != null) { if (locationErrors != null) {
for (int i = 0; i < locationErrors.size(); ++i) { for (int i = 0; i < locationErrors.size(); ++i) {
int originalIndex = locationErrorRows.get(i); int originalIndex = locationErrorRows.get(i);
Row row = retainedActions.get(originalIndex).getAction(); Row row = retainedActions.get(originalIndex).getAction();
ars.manageError(originalIndex, row, ars.manageError(originalIndex, row,
Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null); Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
} }
} }
ars.sendMultiAction(actionsByServer, 1, null, false); ars.sendMultiAction(actionsByServer, 1, null, false);
@ -406,7 +419,7 @@ class AsyncProcess {
* @param actionsByServer the multiaction per server * @param actionsByServer the multiaction per server
* @param nonceGroup Nonce group. * @param nonceGroup Nonce group.
*/ */
private void addAction(ServerName server, byte[] regionName, Action<Row> action, private static void addAction(ServerName server, byte[] regionName, Action<Row> action,
Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) { Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) {
MultiAction<Row> multiAction = actionsByServer.get(server); MultiAction<Row> multiAction = actionsByServer.get(server);
if (multiAction == null) { if (multiAction == null) {
@ -531,7 +544,7 @@ class AsyncProcess {
return ars; return ars;
} }
private void setNonce(NonceGenerator ng, Row r, Action<Row> action) { private static void setNonce(NonceGenerator ng, Row r, Action<Row> action) {
if (!(r instanceof Append) && !(r instanceof Increment)) return; if (!(r instanceof Append) && !(r instanceof Increment)) return;
action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled. action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
} }

View File

@ -318,6 +318,13 @@ public class HTable implements HTableInterface, RegionLocator {
cleanupConnectionOnClose = false; cleanupConnectionOnClose = false;
} }
/**
* @return maxKeyValueSize from configuration.
*/
public static int getMaxKeyValueSize(Configuration conf) {
return conf.getInt("hbase.client.keyvalue.maxsize", -1);
}
/** /**
* setup this HTable's parameter based on the passed configuration * setup this HTable's parameter based on the passed configuration
*/ */
@ -348,8 +355,7 @@ public class HTable implements HTableInterface, RegionLocator {
ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory); ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory);
multiAp = this.connection.getAsyncProcess(); multiAp = this.connection.getAsyncProcess();
this.maxKeyValueSize = this.configuration.getInt( this.maxKeyValueSize = getMaxKeyValueSize(this.configuration);
"hbase.client.keyvalue.maxsize", -1);
this.closed = false; this.closed = false;
} }
@ -1470,7 +1476,12 @@ public class HTable implements HTableInterface, RegionLocator {
} }
// validate for well-formedness // validate for well-formedness
public void validatePut(final Put put) throws IllegalArgumentException{ public void validatePut(final Put put) throws IllegalArgumentException {
validatePut(put, maxKeyValueSize);
}
// validate for well-formedness
public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
if (put.isEmpty()) { if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert"); throw new IllegalArgumentException("No columns to insert");
} }

View File

@ -22,13 +22,12 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException; import java.io.IOException;
import java.util.AbstractMap.SimpleEntry; import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -41,8 +40,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/** /**
@ -67,35 +67,35 @@ public class HTableMultiplexer {
static final String TABLE_MULTIPLEXER_FLUSH_FREQ_MS = "hbase.tablemultiplexer.flush.frequency.ms"; static final String TABLE_MULTIPLEXER_FLUSH_FREQ_MS = "hbase.tablemultiplexer.flush.frequency.ms";
private Map<TableName, HTable> tableNameToHTableMap;
/** The map between each region server to its corresponding buffer queue */ /** The map between each region server to its corresponding buffer queue */
private Map<HRegionLocation, LinkedBlockingQueue<PutStatus>> private final Map<HRegionLocation, LinkedBlockingQueue<PutStatus>> serverToBufferQueueMap =
serverToBufferQueueMap; new ConcurrentHashMap<>();
/** The map between each region server to its flush worker */ /** The map between each region server to its flush worker */
private Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap; private final Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap =
new ConcurrentHashMap<>();
private Configuration conf; private final Configuration conf;
private int retryNum; private final ClusterConnection conn;
private final ExecutorService pool;
private final int retryNum;
private int perRegionServerBufferQueueSize; private int perRegionServerBufferQueueSize;
private final int maxKeyValueSize;
/** /**
*
* @param conf The HBaseConfiguration * @param conf The HBaseConfiguration
* @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
* for each region server before dropping the request. * each region server before dropping the request.
*/ */
public HTableMultiplexer(Configuration conf, public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
int perRegionServerBufferQueueSize) throws ZooKeeperConnectionException { throws IOException {
this.conf = conf; this.conf = conf;
this.serverToBufferQueueMap = new ConcurrentHashMap<HRegionLocation, this.conn = (ClusterConnection) ConnectionFactory.createConnection(conf);
LinkedBlockingQueue<PutStatus>>(); this.pool = HTable.getDefaultExecutor(conf);
this.serverToFlushWorkerMap = new ConcurrentHashMap<HRegionLocation, HTableFlushWorker>();
this.tableNameToHTableMap = new ConcurrentSkipListMap<TableName, HTable>();
this.retryNum = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, this.retryNum = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize; this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
} }
/** /**
@ -110,10 +110,6 @@ public class HTableMultiplexer {
return put(tableName, put, this.retryNum); return put(tableName, put, this.retryNum);
} }
public boolean put(byte[] tableName, final Put put) throws IOException {
return put(TableName.valueOf(tableName), put);
}
/** /**
* The puts request will be buffered by their corresponding buffer queue. * The puts request will be buffered by their corresponding buffer queue.
* Return the list of puts which could not be queued. * Return the list of puts which could not be queued.
@ -165,15 +161,14 @@ public class HTableMultiplexer {
return false; return false;
} }
LinkedBlockingQueue<PutStatus> queue;
HTable htable = getHTable(tableName);
try { try {
htable.validatePut(put); HTable.validatePut(put, maxKeyValueSize);
HRegionLocation loc = htable.getRegionLocation(put.getRow(), false); HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
if (loc != null) { if (loc != null) {
// Add the put pair into its corresponding queue. // Add the put pair into its corresponding queue.
queue = addNewRegionServer(loc, htable);
// Generate a MultiPutStatus obj and offer it into the queue LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
// Generate a MultiPutStatus object and offer it into the queue
PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry); PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry);
return queue.offer(s); return queue.offer(s);
@ -196,43 +191,30 @@ public class HTableMultiplexer {
return new HTableMultiplexerStatus(serverToFlushWorkerMap); return new HTableMultiplexerStatus(serverToFlushWorkerMap);
} }
private LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
LinkedBlockingQueue<PutStatus> queue = serverToBufferQueueMap.get(addr);
if (queue == null) {
synchronized (this.serverToBufferQueueMap) {
queue = serverToBufferQueueMap.get(addr);
if (queue == null) {
// Create a queue for the new region server
queue = new LinkedBlockingQueue<PutStatus>(perRegionServerBufferQueueSize);
serverToBufferQueueMap.put(addr, queue);
private HTable getHTable(TableName tableName) throws IOException { // Create the flush worker
HTable htable = this.tableNameToHTableMap.get(tableName); HTableFlushWorker worker =
if (htable == null) { new HTableFlushWorker(conf, this.conn, addr, this, queue, pool);
synchronized (this.tableNameToHTableMap) { this.serverToFlushWorkerMap.put(addr, worker);
htable = this.tableNameToHTableMap.get(tableName);
if (htable == null) { // Launch a daemon thread to flush the puts
htable = new HTable(conf, tableName); // from the queue to its corresponding region server.
this.tableNameToHTableMap.put(tableName, htable); String name = "HTableFlushWorker-" + addr.getHostnamePort() + "-" + (poolID++);
Thread t = new Thread(worker, name);
t.setDaemon(true);
t.start();
} }
} }
} }
return htable;
}
private synchronized LinkedBlockingQueue<PutStatus> addNewRegionServer(
HRegionLocation addr, HTable htable) {
LinkedBlockingQueue<PutStatus> queue =
serverToBufferQueueMap.get(addr);
if (queue == null) {
// Create a queue for the new region server
queue = new LinkedBlockingQueue<PutStatus>(perRegionServerBufferQueueSize);
serverToBufferQueueMap.put(addr, queue);
// Create the flush worker
HTableFlushWorker worker = new HTableFlushWorker(conf, addr,
this, queue, htable);
this.serverToFlushWorkerMap.put(addr, worker);
// Launch a daemon thread to flush the puts
// from the queue to its corresponding region server.
String name = "HTableFlushWorker-" + addr.getHostnamePort() + "-"
+ (poolID++);
Thread t = new Thread(worker, name);
t.setDaemon(true);
t.start();
}
return queue; return queue;
} }
@ -405,28 +387,25 @@ public class HTableMultiplexer {
} }
private static class HTableFlushWorker implements Runnable { private static class HTableFlushWorker implements Runnable {
private HRegionLocation addr; private final HRegionLocation addr;
private Configuration conf; private final Configuration conf;
private LinkedBlockingQueue<PutStatus> queue; private final ClusterConnection conn;
private HTableMultiplexer htableMultiplexer; private final LinkedBlockingQueue<PutStatus> queue;
private AtomicLong totalFailedPutCount; private final HTableMultiplexer htableMultiplexer;
private AtomicInteger currentProcessingPutCount; private final AtomicLong totalFailedPutCount = new AtomicLong(0);
private AtomicAverageCounter averageLatency; private final AtomicInteger currentProcessingPutCount = new AtomicInteger(0);
private AtomicLong maxLatency; private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
private HTable htable; // For Multi private final AtomicLong maxLatency = new AtomicLong(0);
private final ExecutorService pool;
public HTableFlushWorker(Configuration conf, HRegionLocation addr, public HTableFlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
HTableMultiplexer htableMultiplexer, HTableMultiplexer htableMultiplexer, LinkedBlockingQueue<PutStatus> queue, ExecutorService pool) {
LinkedBlockingQueue<PutStatus> queue, HTable htable) {
this.addr = addr; this.addr = addr;
this.conf = conf; this.conf = conf;
this.conn = conn;
this.htableMultiplexer = htableMultiplexer; this.htableMultiplexer = htableMultiplexer;
this.queue = queue; this.queue = queue;
this.totalFailedPutCount = new AtomicLong(0); this.pool = pool;
this.currentProcessingPutCount = new AtomicInteger(0);
this.averageLatency = new AtomicAverageCounter();
this.maxLatency = new AtomicLong(0);
this.htable = htable;
} }
public long getTotalFailedCount() { public long getTotalFailedCount() {
@ -466,7 +445,7 @@ public class HTableMultiplexer {
@edu.umd.cs.findbugs.annotations.SuppressWarnings @edu.umd.cs.findbugs.annotations.SuppressWarnings
(value = "REC_CATCH_EXCEPTION", justification = "na") (value = "REC_CATCH_EXCEPTION", justification = "na")
public void run() { public void run() {
List<PutStatus> processingList = new ArrayList<PutStatus>(); List<PutStatus> processingList = new ArrayList<>();
/** /**
* The frequency in milliseconds for the current thread to process the corresponding * The frequency in milliseconds for the current thread to process the corresponding
* buffer queue. * buffer queue.
@ -481,6 +460,8 @@ public class HTableMultiplexer {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
AsyncProcess ap = conn.getAsyncProcess();
long start, elapsed; long start, elapsed;
int failedCount = 0; int failedCount = 0;
while (true) { while (true) {
@ -496,16 +477,29 @@ public class HTableMultiplexer {
currentProcessingPutCount.set(processingList.size()); currentProcessingPutCount.set(processingList.size());
if (processingList.size() > 0) { if (processingList.size() > 0) {
ArrayList<Put> list = new ArrayList<Put>(processingList.size()); List<Action<Row>> retainedActions = new ArrayList<>(processingList.size());
for (PutStatus putStatus: processingList) { MultiAction<Row> actions = new MultiAction<>();
list.add(putStatus.getPut()); for (int i = 0; i < processingList.size(); i++) {
PutStatus putStatus = processingList.get(i);
Action<Row> action = new Action<Row>(putStatus.getPut(), i);
actions.add(putStatus.getRegionInfo().getRegionName(), action);
retainedActions.add(action);
} }
// Process this multiput request // Process this multi-put request
List<Put> failed = null; List<PutStatus> failed = null;
Object[] results = new Object[list.size()]; Object[] results = new Object[actions.size()];
ServerName server = addr.getServerName();
Map<ServerName, MultiAction<Row>> actionsByServer =
Collections.singletonMap(server, actions);
try { try {
htable.batch(list, results); AsyncRequestFuture arf =
ap.submitMultiActions(null, retainedActions, 0L, null, results,
true, null, null, actionsByServer, pool);
arf.waitUntilDone();
if (arf.hasError()) {
throw arf.getErrors();
}
} catch (IOException e) { } catch (IOException e) {
LOG.debug("Caught some exceptions " + e LOG.debug("Caught some exceptions " + e
+ " when flushing puts to region server " + addr.getHostnamePort()); + " when flushing puts to region server " + addr.getHostnamePort());
@ -515,35 +509,26 @@ public class HTableMultiplexer {
// results are returned in the same order as the requests in list // results are returned in the same order as the requests in list
// walk the list backwards, so we can remove from list without // walk the list backwards, so we can remove from list without
// impacting the indexes of earlier members // impacting the indexes of earlier members
for (int i = results.length - 1; i >= 0; i--) { for (int i = 0; i < results.length; i++) {
if (results[i] instanceof Result) { if (results[i] == null) {
// successful Puts are removed from the list here. if (failed == null) {
list.remove(i); failed = new ArrayList<PutStatus>();
}
failed.add(processingList.get(i));
} }
} }
failed = list;
} }
if (failed != null) { if (failed != null) {
if (failed.size() == processingList.size()) { // Resubmit failed puts
// All the puts for this region server are failed. Going to retry it later for (PutStatus putStatus : processingList) {
for (PutStatus putStatus: processingList) { if (!resubmitFailedPut(putStatus, this.addr)) {
if (!resubmitFailedPut(putStatus, this.addr)) { failedCount++;
failedCount++;
}
}
} else {
Set<Put> failedPutSet = new HashSet<Put>(failed);
for (PutStatus putStatus: processingList) {
if (failedPutSet.contains(putStatus.getPut())
&& !resubmitFailedPut(putStatus, this.addr)) {
failedCount++;
}
} }
} }
// Update the totalFailedCount
this.totalFailedPutCount.addAndGet(failedCount);
} }
// Update the totalFailedCount
this.totalFailedPutCount.addAndGet(failedCount);
elapsed = EnvironmentEdgeManager.currentTime() - start; elapsed = EnvironmentEdgeManager.currentTime() - start;
// Update latency counters // Update latency counters
@ -580,7 +565,7 @@ public class HTableMultiplexer {
// Log all the exceptions and move on // Log all the exceptions and move on
LOG.debug("Caught some exceptions " + e LOG.debug("Caught some exceptions " + e
+ " when flushing puts to region server " + " when flushing puts to region server "
+ addr.getHostnamePort()); + addr.getHostnamePort(), e);
} }
} }
} }

View File

@ -27,8 +27,8 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -64,10 +64,27 @@ public class TestHTableMultiplexer {
TEST_UTIL.shutdownMiniCluster(); TEST_UTIL.shutdownMiniCluster();
} }
private static void checkExistence(HTable htable, byte[] row, byte[] family, byte[] quality)
throws Exception {
// verify that the Get returns the correct result
Result r;
Get get = new Get(row);
get.addColumn(FAMILY, QUALIFIER);
int nbTry = 0;
do {
assertTrue("Fail to get from " + htable.getName() + " after " + nbTry + " tries", nbTry < 50);
nbTry++;
Thread.sleep(100);
r = htable.get(get);
} while (r == null || r.getValue(FAMILY, QUALIFIER) == null);
assertEquals("value", Bytes.toStringBinary(VALUE1),
Bytes.toStringBinary(r.getValue(FAMILY, QUALIFIER)));
}
@Test @Test
public void testHTableMultiplexer() throws Exception { public void testHTableMultiplexer() throws Exception {
TableName TABLE = TableName TABLE_1 = TableName.valueOf("testHTableMultiplexer_1");
TableName.valueOf("testHTableMultiplexer"); TableName TABLE_2 = TableName.valueOf("testHTableMultiplexer_2");
final int NUM_REGIONS = 10; final int NUM_REGIONS = 10;
final int VERSION = 3; final int VERSION = 3;
List<Put> failedPuts; List<Put> failedPuts;
@ -76,35 +93,35 @@ public class TestHTableMultiplexer {
HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(), HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(),
PER_REGIONSERVER_QUEUE_SIZE); PER_REGIONSERVER_QUEUE_SIZE);
HTable ht = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, VERSION, HTable htable1 =
TEST_UTIL.createTable(TABLE_1, new byte[][] { FAMILY }, VERSION,
Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS); Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
TEST_UTIL.waitUntilAllRegionsAssigned(TABLE); HTable htable2 =
TEST_UTIL.createTable(TABLE_2, new byte[][] { FAMILY }, VERSION, Bytes.toBytes("aaaaa"),
Bytes.toBytes("zzzzz"), NUM_REGIONS);
TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_1);
TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_2);
byte[][] startRows = ht.getStartKeys(); byte[][] startRows = htable1.getStartKeys();
byte[][] endRows = ht.getEndKeys(); byte[][] endRows = htable1.getEndKeys();
// SinglePut case // SinglePut case
for (int i = 0; i < NUM_REGIONS; i++) { for (int i = 0; i < NUM_REGIONS; i++) {
byte [] row = startRows[i]; byte [] row = startRows[i];
if (row == null || row.length <= 0) continue; if (row == null || row.length <= 0) continue;
Put put = new Put(row); Put put = new Put(row).add(FAMILY, QUALIFIER, VALUE1);
put.add(FAMILY, QUALIFIER, VALUE1); success = multiplexer.put(TABLE_1, put);
success = multiplexer.put(TABLE, put); assertTrue("multiplexer.put returns", success);
assertTrue(success);
LOG.info("Put for " + Bytes.toString(startRows[i]) + " @ iteration " + (i+1)); put = new Put(row).add(FAMILY, QUALIFIER, VALUE1);
success = multiplexer.put(TABLE_2, put);
assertTrue("multiplexer.put failed", success);
LOG.info("Put for " + Bytes.toStringBinary(startRows[i]) + " @ iteration " + (i + 1));
// verify that the Get returns the correct result // verify that the Get returns the correct result
Get get = new Get(startRows[i]); checkExistence(htable1, startRows[i], FAMILY, QUALIFIER);
get.addColumn(FAMILY, QUALIFIER); checkExistence(htable2, startRows[i], FAMILY, QUALIFIER);
Result r;
int nbTry = 0;
do {
assertTrue(nbTry++ < 50);
Thread.sleep(100);
r = ht.get(get);
} while (r == null || r.getValue(FAMILY, QUALIFIER) == null);
assertEquals(0, Bytes.compareTo(VALUE1, r.getValue(FAMILY, QUALIFIER)));
} }
// MultiPut case // MultiPut case
@ -116,7 +133,7 @@ public class TestHTableMultiplexer {
put.add(FAMILY, QUALIFIER, VALUE2); put.add(FAMILY, QUALIFIER, VALUE2);
multiput.add(put); multiput.add(put);
} }
failedPuts = multiplexer.put(TABLE, multiput); failedPuts = multiplexer.put(TABLE_1, multiput);
assertTrue(failedPuts == null); assertTrue(failedPuts == null);
// verify that the Get returns the correct result // verify that the Get returns the correct result
@ -130,7 +147,7 @@ public class TestHTableMultiplexer {
do { do {
assertTrue(nbTry++ < 50); assertTrue(nbTry++ < 50);
Thread.sleep(100); Thread.sleep(100);
r = ht.get(get); r = htable1.get(get);
} while (r == null || r.getValue(FAMILY, QUALIFIER) == null || } while (r == null || r.getValue(FAMILY, QUALIFIER) == null ||
Bytes.compareTo(VALUE2, r.getValue(FAMILY, QUALIFIER)) != 0); Bytes.compareTo(VALUE2, r.getValue(FAMILY, QUALIFIER)) != 0);
} }