diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java deleted file mode 100644 index e6b061e45fd..00000000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ /dev/null @@ -1,664 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import java.io.IOException; -import java.util.AbstractMap.SimpleEntry; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; - -/** - * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables. - * Each put will be sharded into different buffer queues based on its destination region server. - * So each region server buffer queue will only have the puts which share the same destination. - * And each queue will have a flush worker thread to flush the puts request to the region server. - * If any queue is full, the HTableMultiplexer starts to drop the Put requests for that - * particular queue. - * - * Also all the puts will be retried as a configuration number before dropping. - * And the HTableMultiplexer can report the number of buffered requests and the number of the - * failed (dropped) requests in total or on per region server basis. - * - * This class is thread safe. - */ -@InterfaceAudience.Public -public class HTableMultiplexer { - private static final Logger LOG = LoggerFactory.getLogger(HTableMultiplexer.class.getName()); - - public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS = - "hbase.tablemultiplexer.flush.period.ms"; - public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads"; - public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE = - "hbase.client.max.retries.in.queue"; - - /** The map between each region server to its flush worker */ - private final Map serverToFlushWorkerMap = - new ConcurrentHashMap<>(); - - private final Configuration workerConf; - private final ClusterConnection conn; - private final ExecutorService pool; - private final int maxAttempts; - private final int perRegionServerBufferQueueSize; - private final int maxKeyValueSize; - private final ScheduledExecutorService executor; - private final long flushPeriod; - - /** - * @param conf The HBaseConfiguration - * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for - * each region server before dropping the request. - */ - public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize) - throws IOException { - this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize); - } - - /** - * @param conn The HBase connection. - * @param conf The HBase configuration - * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for - * each region server before dropping the request. - */ - public HTableMultiplexer(Connection conn, Configuration conf, - int perRegionServerBufferQueueSize) { - this.conn = (ClusterConnection) conn; - this.pool = HTable.getDefaultExecutor(conf); - // how many times we could try in total, one more than retry number - this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; - this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize; - this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf); - this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100); - int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10); - this.executor = - Executors.newScheduledThreadPool(initThreads, - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build()); - - this.workerConf = HBaseConfiguration.create(conf); - // We do not do the retry because we need to reassign puts to different queues if regions are - // moved. - this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); - } - - /** - * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already - * been closed. - * @throws IOException If there is an error closing the connection. - */ - @SuppressWarnings("deprecation") - public synchronized void close() throws IOException { - if (!getConnection().isClosed()) { - getConnection().close(); - } - } - - /** - * The put request will be buffered by its corresponding buffer queue. Return false if the queue - * is already full. - * @param tableName - * @param put - * @return true if the request can be accepted by its corresponding buffer queue. - */ - public boolean put(TableName tableName, final Put put) { - return put(tableName, put, this.maxAttempts); - } - - /** - * The puts request will be buffered by their corresponding buffer queue. - * Return the list of puts which could not be queued. - * @param tableName - * @param puts - * @return the list of puts which could not be queued - */ - public List put(TableName tableName, final List puts) { - if (puts == null) - return null; - - List failedPuts = null; - boolean result; - for (Put put : puts) { - result = put(tableName, put, this.maxAttempts); - if (result == false) { - - // Create the failed puts list if necessary - if (failedPuts == null) { - failedPuts = new ArrayList<>(); - } - // Add the put to the failed puts list - failedPuts.add(put); - } - } - return failedPuts; - } - - /** - * @deprecated Use {@link #put(TableName, List) } instead. - */ - @Deprecated - public List put(byte[] tableName, final List puts) { - return put(TableName.valueOf(tableName), puts); - } - - /** - * The put request will be buffered by its corresponding buffer queue. And the put request will be - * retried before dropping the request. - * Return false if the queue is already full. - * @return true if the request can be accepted by its corresponding buffer queue. - */ - public boolean put(final TableName tableName, final Put put, int maxAttempts) { - if (maxAttempts <= 0) { - return false; - } - - try { - HTable.validatePut(put, maxKeyValueSize); - // Allow mocking to get at the connection, but don't expose the connection to users. - ClusterConnection conn = (ClusterConnection) getConnection(); - // AsyncProcess in the FlushWorker should take care of refreshing the location cache - // as necessary. We shouldn't have to do that here. - HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false); - if (loc != null) { - // Add the put pair into its corresponding queue. - LinkedBlockingQueue queue = getQueue(loc); - - // Generate a MultiPutStatus object and offer it into the queue - PutStatus s = new PutStatus(loc.getRegion(), put, maxAttempts); - - return queue.offer(s); - } - } catch (IOException e) { - LOG.debug("Cannot process the put " + put, e); - } - return false; - } - - /** - * @deprecated Use {@link #put(TableName, Put) } instead. - */ - @Deprecated - public boolean put(final byte[] tableName, final Put put, int retry) { - return put(TableName.valueOf(tableName), put, retry); - } - - /** - * @deprecated Use {@link #put(TableName, Put)} instead. - */ - @Deprecated - public boolean put(final byte[] tableName, Put put) { - return put(TableName.valueOf(tableName), put); - } - - /** - * @return the current HTableMultiplexerStatus - */ - public HTableMultiplexerStatus getHTableMultiplexerStatus() { - return new HTableMultiplexerStatus(serverToFlushWorkerMap); - } - - @VisibleForTesting - LinkedBlockingQueue getQueue(HRegionLocation addr) { - FlushWorker worker = serverToFlushWorkerMap.get(addr); - if (worker == null) { - synchronized (this.serverToFlushWorkerMap) { - worker = serverToFlushWorkerMap.get(addr); - if (worker == null) { - // Create the flush worker - worker = new FlushWorker(workerConf, this.conn, addr, this, - perRegionServerBufferQueueSize, pool, executor); - this.serverToFlushWorkerMap.put(addr, worker); - executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS); - } - } - } - return worker.getQueue(); - } - - @VisibleForTesting - ClusterConnection getConnection() { - return this.conn; - } - - /** - * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer. - * report the number of buffered requests and the number of the failed (dropped) requests - * in total or on per region server basis. - */ - @InterfaceAudience.Public - public static class HTableMultiplexerStatus { - private long totalFailedPutCounter; - private long totalBufferedPutCounter; - private long maxLatency; - private long overallAverageLatency; - private Map serverToFailedCounterMap; - private Map serverToBufferedCounterMap; - private Map serverToAverageLatencyMap; - private Map serverToMaxLatencyMap; - - public HTableMultiplexerStatus( - Map serverToFlushWorkerMap) { - this.totalBufferedPutCounter = 0; - this.totalFailedPutCounter = 0; - this.maxLatency = 0; - this.overallAverageLatency = 0; - this.serverToBufferedCounterMap = new HashMap<>(); - this.serverToFailedCounterMap = new HashMap<>(); - this.serverToAverageLatencyMap = new HashMap<>(); - this.serverToMaxLatencyMap = new HashMap<>(); - this.initialize(serverToFlushWorkerMap); - } - - private void initialize( - Map serverToFlushWorkerMap) { - if (serverToFlushWorkerMap == null) { - return; - } - - long averageCalcSum = 0; - int averageCalcCount = 0; - for (Map.Entry entry : serverToFlushWorkerMap - .entrySet()) { - HRegionLocation addr = entry.getKey(); - FlushWorker worker = entry.getValue(); - - long bufferedCounter = worker.getTotalBufferedCount(); - long failedCounter = worker.getTotalFailedCount(); - long serverMaxLatency = worker.getMaxLatency(); - AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter(); - // Get sum and count pieces separately to compute overall average - SimpleEntry averageComponents = averageCounter - .getComponents(); - long serverAvgLatency = averageCounter.getAndReset(); - - this.totalBufferedPutCounter += bufferedCounter; - this.totalFailedPutCounter += failedCounter; - if (serverMaxLatency > this.maxLatency) { - this.maxLatency = serverMaxLatency; - } - averageCalcSum += averageComponents.getKey(); - averageCalcCount += averageComponents.getValue(); - - this.serverToBufferedCounterMap.put(addr.getHostnamePort(), - bufferedCounter); - this.serverToFailedCounterMap - .put(addr.getHostnamePort(), - failedCounter); - this.serverToAverageLatencyMap.put(addr.getHostnamePort(), - serverAvgLatency); - this.serverToMaxLatencyMap - .put(addr.getHostnamePort(), - serverMaxLatency); - } - this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum - / averageCalcCount : 0; - } - - public long getTotalBufferedCounter() { - return this.totalBufferedPutCounter; - } - - public long getTotalFailedCounter() { - return this.totalFailedPutCounter; - } - - public long getMaxLatency() { - return this.maxLatency; - } - - public long getOverallAverageLatency() { - return this.overallAverageLatency; - } - - public Map getBufferedCounterForEachRegionServer() { - return this.serverToBufferedCounterMap; - } - - public Map getFailedCounterForEachRegionServer() { - return this.serverToFailedCounterMap; - } - - public Map getMaxLatencyForEachRegionServer() { - return this.serverToMaxLatencyMap; - } - - public Map getAverageLatencyForEachRegionServer() { - return this.serverToAverageLatencyMap; - } - } - - @VisibleForTesting - static class PutStatus { - final RegionInfo regionInfo; - final Put put; - final int maxAttempCount; - - public PutStatus(RegionInfo regionInfo, Put put, int maxAttempCount) { - this.regionInfo = regionInfo; - this.put = put; - this.maxAttempCount = maxAttempCount; - } - } - - /** - * Helper to count the average over an interval until reset. - */ - private static class AtomicAverageCounter { - private long sum; - private int count; - - public AtomicAverageCounter() { - this.sum = 0L; - this.count = 0; - } - - public synchronized long getAndReset() { - long result = this.get(); - this.reset(); - return result; - } - - public synchronized long get() { - if (this.count == 0) { - return 0; - } - return this.sum / this.count; - } - - public synchronized SimpleEntry getComponents() { - return new SimpleEntry<>(sum, count); - } - - public synchronized void reset() { - this.sum = 0L; - this.count = 0; - } - - public synchronized void add(long value) { - this.sum += value; - this.count++; - } - } - - @VisibleForTesting - static class FlushWorker implements Runnable { - private final HRegionLocation addr; - private final LinkedBlockingQueue queue; - private final HTableMultiplexer multiplexer; - private final AtomicLong totalFailedPutCount = new AtomicLong(0); - private final AtomicInteger currentProcessingCount = new AtomicInteger(0); - private final AtomicAverageCounter averageLatency = new AtomicAverageCounter(); - private final AtomicLong maxLatency = new AtomicLong(0); - - private final AsyncProcess ap; - private final List processingList = new ArrayList<>(); - private final ScheduledExecutorService executor; - private final int maxRetryInQueue; - private final AtomicInteger retryInQueue = new AtomicInteger(0); - private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor - private final int operationTimeout; - private final ExecutorService pool; - public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, - HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, - ExecutorService pool, ScheduledExecutorService executor) { - this.addr = addr; - this.multiplexer = htableMultiplexer; - this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); - RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); - RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); - this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, - conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory); - this.executor = executor; - this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); - this.pool = pool; - } - - protected LinkedBlockingQueue getQueue() { - return this.queue; - } - - public long getTotalFailedCount() { - return totalFailedPutCount.get(); - } - - public long getTotalBufferedCount() { - return (long) queue.size() + currentProcessingCount.get(); - } - - public AtomicAverageCounter getAverageLatencyCounter() { - return this.averageLatency; - } - - public long getMaxLatency() { - return this.maxLatency.getAndSet(0); - } - - boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException { - // Decrease the retry count - final int retryCount = ps.maxAttempCount - 1; - - if (retryCount <= 0) { - // Update the failed counter and no retry any more. - return false; - } - - int cnt = getRetryInQueue().incrementAndGet(); - if (cnt > getMaxRetryInQueue()) { - // Too many Puts in queue for resubmit, give up this - getRetryInQueue().decrementAndGet(); - return false; - } - - final Put failedPut = ps.put; - // The currentPut is failed. So get the table name for the currentPut. - final TableName tableName = ps.regionInfo.getTable(); - - long delayMs = getNextDelay(retryCount); - if (LOG.isDebugEnabled()) { - LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount); - } - - // HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating - // the region location cache when the Put original failed with some exception. If we keep - // re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff - // that we expect it to. - getExecutor().schedule(new Runnable() { - @Override - public void run() { - boolean succ = false; - try { - succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount); - } finally { - FlushWorker.this.getRetryInQueue().decrementAndGet(); - if (!succ) { - FlushWorker.this.getTotalFailedPutCount().incrementAndGet(); - } - } - } - }, delayMs, TimeUnit.MILLISECONDS); - return true; - } - - @VisibleForTesting - long getNextDelay(int retryCount) { - return ConnectionUtils.getPauseTime(multiplexer.flushPeriod, - multiplexer.maxAttempts - retryCount - 1); - } - - @VisibleForTesting - AtomicInteger getRetryInQueue() { - return this.retryInQueue; - } - - @VisibleForTesting - int getMaxRetryInQueue() { - return this.maxRetryInQueue; - } - - @VisibleForTesting - AtomicLong getTotalFailedPutCount() { - return this.totalFailedPutCount; - } - - @VisibleForTesting - HTableMultiplexer getMultiplexer() { - return this.multiplexer; - } - - @VisibleForTesting - ScheduledExecutorService getExecutor() { - return this.executor; - } - - @Override - public void run() { - int failedCount = 0; - try { - long start = EnvironmentEdgeManager.currentTime(); - - // drain all the queued puts into the tmp list - processingList.clear(); - queue.drainTo(processingList); - if (processingList.isEmpty()) { - // Nothing to flush - return; - } - - currentProcessingCount.set(processingList.size()); - // failedCount is decreased whenever a Put is success or resubmit. - failedCount = processingList.size(); - - List retainedActions = new ArrayList<>(processingList.size()); - MultiAction actions = new MultiAction(); - for (int i = 0; i < processingList.size(); i++) { - PutStatus putStatus = processingList.get(i); - Action action = new Action(putStatus.put, i); - actions.add(putStatus.regionInfo.getRegionName(), action); - retainedActions.add(action); - } - - // Process this multi-put request - List failed = null; - Object[] results = new Object[actions.size()]; - ServerName server = addr.getServerName(); - Map actionsByServer = - Collections.singletonMap(server, actions); - try { - AsyncProcessTask task = AsyncProcessTask.newBuilder() - .setResults(results) - .setPool(pool) - .setRpcTimeout(writeRpcTimeout) - .setOperationTimeout(operationTimeout) - .build(); - AsyncRequestFuture arf = - ap.submitMultiActions(task, retainedActions, 0L, null, null, actionsByServer); - arf.waitUntilDone(); - if (arf.hasError()) { - // We just log and ignore the exception here since failed Puts will be resubmit again. - LOG.debug("Caught some exceptions when flushing puts to region server " - + addr.getHostnamePort(), arf.getErrors()); - } - } finally { - for (int i = 0; i < results.length; i++) { - if (results[i] instanceof Result) { - failedCount--; - } else { - if (failed == null) { - failed = new ArrayList<>(); - } - failed.add(processingList.get(i)); - } - } - } - - if (failed != null) { - // Resubmit failed puts - for (PutStatus putStatus : failed) { - if (resubmitFailedPut(putStatus, this.addr)) { - failedCount--; - } - } - } - - long elapsed = EnvironmentEdgeManager.currentTime() - start; - // Update latency counters - averageLatency.add(elapsed); - if (elapsed > maxLatency.get()) { - maxLatency.set(elapsed); - } - - // Log some basic info - if (LOG.isDebugEnabled()) { - LOG.debug("Processed " + currentProcessingCount + " put requests for " - + addr.getHostnamePort() + " and " + failedCount + " failed" - + ", latency for this send: " + elapsed); - } - - // Reset the current processing put count - currentProcessingCount.set(0); - } catch (RuntimeException e) { - // To make findbugs happy - // Log all the exceptions and move on - LOG.debug( - "Caught some exceptions " + e + " when flushing puts to region server " - + addr.getHostnamePort(), e); - } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - // Log all the exceptions and move on - LOG.debug( - "Caught some exceptions " + e + " when flushing puts to region server " - + addr.getHostnamePort(), e); - } finally { - // Update the totalFailedCount - this.totalFailedPutCount.addAndGet(failedCount); - } - } - } -} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java deleted file mode 100644 index cce4939279d..00000000000 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Mockito.doCallRealMethod; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category(SmallTests.class) -public class TestHTableMultiplexerViaMocks { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestHTableMultiplexerViaMocks.class); - - private HTableMultiplexer mockMultiplexer; - private ClusterConnection mockConnection; - - @Before - public void setupTest() { - mockMultiplexer = mock(HTableMultiplexer.class); - mockConnection = mock(ClusterConnection.class); - - // Call the real put(TableName, Put, int) method - when(mockMultiplexer.put(any(TableName.class), any(), anyInt())).thenCallRealMethod(); - - // Return the mocked ClusterConnection - when(mockMultiplexer.getConnection()).thenReturn(mockConnection); - } - - @SuppressWarnings("deprecation") - @Test public void testConnectionClosing() throws IOException { - doCallRealMethod().when(mockMultiplexer).close(); - // If the connection is not closed - when(mockConnection.isClosed()).thenReturn(false); - - mockMultiplexer.close(); - - // We should close it - verify(mockConnection).close(); - } - - @SuppressWarnings("deprecation") - @Test public void testClosingAlreadyClosedConnection() throws IOException { - doCallRealMethod().when(mockMultiplexer).close(); - // If the connection is already closed - when(mockConnection.isClosed()).thenReturn(true); - - mockMultiplexer.close(); - - // We should not close it again - verify(mockConnection, times(0)).close(); - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java deleted file mode 100644 index 92468385356..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.List; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Category({LargeTests.class, ClientTests.class}) -public class TestHTableMultiplexer { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestHTableMultiplexer.class); - - private static final Logger LOG = LoggerFactory.getLogger(TestHTableMultiplexer.class); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static byte[] FAMILY = Bytes.toBytes("testFamily"); - private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); - private static byte[] VALUE1 = Bytes.toBytes("testValue1"); - private static byte[] VALUE2 = Bytes.toBytes("testValue2"); - private static int SLAVES = 3; - private static int PER_REGIONSERVER_QUEUE_SIZE = 100000; - - @Rule - public TestName name = new TestName(); - - /** - * @throws java.lang.Exception - */ - @BeforeClass - public static void setUpBeforeClass() throws Exception { - TEST_UTIL.startMiniCluster(SLAVES); - } - - /** - * @throws java.lang.Exception - */ - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - private static void checkExistence(Table htable, byte[] row, byte[] family, byte[] quality) - throws Exception { - // verify that the Get returns the correct result - Result r; - Get get = new Get(row); - get.addColumn(FAMILY, QUALIFIER); - int nbTry = 0; - do { - assertTrue("Fail to get from " + htable.getName() + " after " + nbTry + " tries", nbTry < 50); - nbTry++; - Thread.sleep(100); - r = htable.get(get); - } while (r == null || r.getValue(FAMILY, QUALIFIER) == null); - assertEquals("value", Bytes.toStringBinary(VALUE1), - Bytes.toStringBinary(r.getValue(FAMILY, QUALIFIER))); - } - - @Test - public void testHTableMultiplexer() throws Exception { - final TableName tableName1 = TableName.valueOf(name.getMethodName() + "_1"); - final TableName tableName2 = TableName.valueOf(name.getMethodName() + "_2"); - final int NUM_REGIONS = 10; - final int VERSION = 3; - List failedPuts; - boolean success; - - HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(), - PER_REGIONSERVER_QUEUE_SIZE); - - Table htable1 = - TEST_UTIL.createTable(tableName1, new byte[][] { FAMILY }, VERSION, - Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS); - Table htable2 = - TEST_UTIL.createTable(tableName2, new byte[][] { FAMILY }, VERSION, Bytes.toBytes("aaaaa"), - Bytes.toBytes("zzzzz"), NUM_REGIONS); - TEST_UTIL.waitUntilAllRegionsAssigned(tableName1); - TEST_UTIL.waitUntilAllRegionsAssigned(tableName2); - - try (RegionLocator rl = TEST_UTIL.getConnection().getRegionLocator(tableName1)) { - byte[][] startRows = rl.getStartKeys(); - byte[][] endRows = rl.getEndKeys(); - - // SinglePut case - for (int i = 0; i < NUM_REGIONS; i++) { - byte [] row = startRows[i]; - if (row == null || row.length <= 0) continue; - Put put = new Put(row).addColumn(FAMILY, QUALIFIER, VALUE1); - success = multiplexer.put(tableName1, put); - assertTrue("multiplexer.put returns", success); - - put = new Put(row).addColumn(FAMILY, QUALIFIER, VALUE1); - success = multiplexer.put(tableName2, put); - assertTrue("multiplexer.put failed", success); - - LOG.info("Put for " + Bytes.toStringBinary(startRows[i]) + " @ iteration " + (i + 1)); - - // verify that the Get returns the correct result - checkExistence(htable1, startRows[i], FAMILY, QUALIFIER); - checkExistence(htable2, startRows[i], FAMILY, QUALIFIER); - } - - // MultiPut case - List multiput = new ArrayList<>(); - for (int i = 0; i < NUM_REGIONS; i++) { - byte [] row = endRows[i]; - if (row == null || row.length <= 0) continue; - Put put = new Put(row); - put.addColumn(FAMILY, QUALIFIER, VALUE2); - multiput.add(put); - } - failedPuts = multiplexer.put(tableName1, multiput); - assertTrue(failedPuts == null); - - // verify that the Get returns the correct result - for (int i = 0; i < NUM_REGIONS; i++) { - byte [] row = endRows[i]; - if (row == null || row.length <= 0) continue; - Get get = new Get(row); - get.addColumn(FAMILY, QUALIFIER); - Result r; - int nbTry = 0; - do { - assertTrue(nbTry++ < 50); - Thread.sleep(100); - r = htable1.get(get); - } while (r == null || r.getValue(FAMILY, QUALIFIER) == null || - Bytes.compareTo(VALUE2, r.getValue(FAMILY, QUALIFIER)) != 0); - } - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java deleted file mode 100644 index cd605aa2e48..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Category({ LargeTests.class, ClientTests.class }) -public class TestHTableMultiplexerFlushCache { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestHTableMultiplexerFlushCache.class); - - private static final Logger LOG = LoggerFactory.getLogger(TestHTableMultiplexerFlushCache.class); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static byte[] FAMILY = Bytes.toBytes("testFamily"); - private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier_1"); - private static byte[] QUALIFIER2 = Bytes.toBytes("testQualifier_2"); - private static byte[] VALUE1 = Bytes.toBytes("testValue1"); - private static byte[] VALUE2 = Bytes.toBytes("testValue2"); - private static int SLAVES = 3; - private static int PER_REGIONSERVER_QUEUE_SIZE = 100000; - - @Rule - public TestName name = new TestName(); - - /** - * @throws java.lang.Exception - */ - @BeforeClass - public static void setUpBeforeClass() throws Exception { - TEST_UTIL.startMiniCluster(SLAVES); - } - - /** - * @throws java.lang.Exception - */ - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - private static void checkExistence(final Table htable, final byte[] row, final byte[] family, - final byte[] quality, - final byte[] value) throws Exception { - // verify that the Get returns the correct result - TEST_UTIL.waitFor(30000, new Waiter.Predicate() { - @Override - public boolean evaluate() throws Exception { - Result r; - Get get = new Get(row); - get.addColumn(family, quality); - r = htable.get(get); - return r != null && r.getValue(family, quality) != null - && Bytes.toStringBinary(value).equals( - Bytes.toStringBinary(r.getValue(family, quality))); - } - }); - } - - @Test - public void testOnRegionChange() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); - final int NUM_REGIONS = 10; - Table htable = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 3, - Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS); - - HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(), - PER_REGIONSERVER_QUEUE_SIZE); - - try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) { - byte[][] startRows = r.getStartKeys(); - byte[] row = startRows[1]; - assertTrue("2nd region should not start with empty row", row != null && row.length > 0); - - Put put = new Put(row).addColumn(FAMILY, QUALIFIER1, VALUE1); - assertTrue("multiplexer.put returns", multiplexer.put(tableName, put)); - - checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1); - - // Now let's shutdown the regionserver and let regions moved to other servers. - HRegionLocation loc = r.getRegionLocation(row); - MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster(); - hbaseCluster.stopRegionServer(loc.getServerName()); - TEST_UTIL.waitUntilAllRegionsAssigned(tableName); - - // put with multiplexer. - put = new Put(row).addColumn(FAMILY, QUALIFIER2, VALUE2); - assertTrue("multiplexer.put returns", multiplexer.put(tableName, put)); - - checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2); - } - } - - @Test - public void testOnRegionMove() throws Exception { - // This test is doing near exactly the same thing that testOnRegionChange but avoiding the - // potential to get a ConnectionClosingException. By moving the region, we can be certain that - // the connection is still valid and that the implementation is correctly handling an invalid - // Region cache (and not just tearing down the entire connection). - final TableName tableName = TableName.valueOf(name.getMethodName()); - final int NUM_REGIONS = 10; - Table htable = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 3, - Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS); - - HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(), - PER_REGIONSERVER_QUEUE_SIZE); - - final RegionLocator regionLocator = TEST_UTIL.getConnection().getRegionLocator(tableName); - Pair startEndRows = regionLocator.getStartEndKeys(); - byte[] row = startEndRows.getFirst()[1]; - assertTrue("2nd region should not start with empty row", row != null && row.length > 0); - - Put put = new Put(row).addColumn(FAMILY, QUALIFIER1, VALUE1); - assertTrue("multiplexer.put returns", multiplexer.put(tableName, put)); - - checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1); - - final HRegionLocation loc = regionLocator.getRegionLocation(row); - final MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster(); - // The current server for the region we're writing to - final ServerName originalServer = loc.getServerName(); - ServerName newServer = null; - // Find a new server to move that region to - for (int i = 0; i < SLAVES; i++) { - HRegionServer rs = hbaseCluster.getRegionServer(i); - if (!rs.getServerName().equals(originalServer.getServerName())) { - newServer = rs.getServerName(); - break; - } - } - assertNotNull("Did not find a new RegionServer to use", newServer); - - // Move the region - LOG.info("Moving " + loc.getRegionInfo().getEncodedName() + " from " + originalServer - + " to " + newServer); - TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), - Bytes.toBytes(newServer.getServerName())); - - TEST_UTIL.waitUntilAllRegionsAssigned(tableName); - - // Send a new Put - put = new Put(row).addColumn(FAMILY, QUALIFIER2, VALUE2); - assertTrue("multiplexer.put returns", multiplexer.put(tableName, put)); - - // We should see the update make it to the new server eventually - checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2); - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocationCaching.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocationCaching.java index 0134276acb2..b877ad79bde 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocationCaching.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocationCaching.java @@ -19,13 +19,11 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.List; - import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; @@ -47,7 +45,6 @@ public class TestRegionLocationCaching { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static int SLAVES = 1; - private static int PER_REGIONSERVER_QUEUE_SIZE = 100000; private static TableName TABLE_NAME = TableName.valueOf("TestRegionLocationCaching"); private static byte[] FAMILY = Bytes.toBytes("testFamily"); private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); @@ -64,48 +61,6 @@ public class TestRegionLocationCaching { TEST_UTIL.shutdownMiniCluster(); } - @Test - public void testCachingForHTableMultiplexerSinglePut() throws Exception { - HTableMultiplexer multiplexer = - new HTableMultiplexer(TEST_UTIL.getConfiguration(), PER_REGIONSERVER_QUEUE_SIZE); - byte[] row = Bytes.toBytes("htable_multiplexer_single_put"); - byte[] value = Bytes.toBytes("value"); - - Put put = new Put(row); - put.addColumn(FAMILY, QUALIFIER, value); - assertTrue("Put request not accepted by multiplexer queue", multiplexer.put(TABLE_NAME, put)); - - checkRegionLocationIsCached(TABLE_NAME, multiplexer.getConnection()); - checkExistence(TABLE_NAME, row, FAMILY, QUALIFIER); - - multiplexer.close(); - } - - @Test - public void testCachingForHTableMultiplexerMultiPut() throws Exception { - HTableMultiplexer multiplexer = - new HTableMultiplexer(TEST_UTIL.getConfiguration(), PER_REGIONSERVER_QUEUE_SIZE); - - List multiput = new ArrayList(); - for (int i = 0; i < 10; i++) { - Put put = new Put(Bytes.toBytes("htable_multiplexer_multi_put" + i)); - byte[] value = Bytes.toBytes("value_" + i); - put.addColumn(FAMILY, QUALIFIER, value); - multiput.add(put); - } - - List failedPuts = multiplexer.put(TABLE_NAME, multiput); - assertNull("All put requests were not accepted by multiplexer queue", failedPuts); - - checkRegionLocationIsCached(TABLE_NAME, multiplexer.getConnection()); - for (int i = 0; i < 10; i++) { - checkExistence(TABLE_NAME, Bytes.toBytes("htable_multiplexer_multi_put" + i), FAMILY, - QUALIFIER); - } - - multiplexer.close(); - } - @Test public void testCachingForHTableSinglePut() throws Exception { byte[] row = Bytes.toBytes("htable_single_put");