HBASE-15221 Reload the cache on re-tried puts in HTableMultiplexer and adds a close() method to HTableMultiplexer
When a Put fails due to a NotServingRegionException, the cached location of that Region is never cleared. Thus, subsequent calls to resubmit the Put will fail in the same way as the original, never determining the new location of the Region. If the Connection is not closed by the user before the Multiplexer is discarded, it will leak resources and could cause resource issues. Signed-off-by: Sean Busbey <busbey@cloudera.com>
This commit is contained in:
parent
eacf7bcf97
commit
dfd8a31a13
|
@ -193,6 +193,11 @@
|
|||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-all</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<profiles>
|
||||
|
|
|
@ -19,6 +19,9 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.AbstractMap.SimpleEntry;
|
||||
import java.util.ArrayList;
|
||||
|
@ -50,8 +53,6 @@ import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
|
|||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
|
||||
* Each put will be sharded into different buffer queues based on its destination region server.
|
||||
|
@ -97,7 +98,18 @@ public class HTableMultiplexer {
|
|||
*/
|
||||
public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
|
||||
throws IOException {
|
||||
this.conn = (ClusterConnection) ConnectionFactory.createConnection(conf);
|
||||
this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param conn The HBase connection.
|
||||
* @param conf The HBase configuration
|
||||
* @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
|
||||
* each region server before dropping the request.
|
||||
*/
|
||||
public HTableMultiplexer(Connection conn, Configuration conf,
|
||||
int perRegionServerBufferQueueSize) {
|
||||
this.conn = (ClusterConnection) conn;
|
||||
this.pool = HTable.getDefaultExecutor(conf);
|
||||
// how many times we could try in total, one more than retry number
|
||||
this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||
|
@ -116,6 +128,18 @@ public class HTableMultiplexer {
|
|||
this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already
|
||||
* been closed.
|
||||
* @throws IOException If there is an error closing the connection.
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
public synchronized void close() throws IOException {
|
||||
if (!getConnection().isClosed()) {
|
||||
getConnection().close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The put request will be buffered by its corresponding buffer queue. Return false if the queue
|
||||
* is already full.
|
||||
|
@ -170,13 +194,28 @@ public class HTableMultiplexer {
|
|||
* @return true if the request can be accepted by its corresponding buffer queue.
|
||||
*/
|
||||
public boolean put(final TableName tableName, final Put put, int maxAttempts) {
|
||||
return _put(tableName, put, maxAttempts, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal "put" which exposes a boolean flag to control whether or not the region location
|
||||
* cache should be reloaded when trying to queue the {@link Put}.
|
||||
* @param tableName Destination table for the Put
|
||||
* @param put The Put to send
|
||||
* @param maxAttempts Number of attempts to retry the {@code put}
|
||||
* @param reloadCache Should the region location cache be reloaded
|
||||
* @return true if the request was accepted in the queue, otherwise false
|
||||
*/
|
||||
boolean _put(final TableName tableName, final Put put, int maxAttempts, boolean reloadCache) {
|
||||
if (maxAttempts <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
HTable.validatePut(put, maxKeyValueSize);
|
||||
HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
|
||||
// Allow mocking to get at the connection, but don't expose the connection to users.
|
||||
ClusterConnection conn = (ClusterConnection) getConnection();
|
||||
HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), reloadCache);
|
||||
if (loc != null) {
|
||||
// Add the put pair into its corresponding queue.
|
||||
LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
|
||||
|
@ -215,7 +254,8 @@ public class HTableMultiplexer {
|
|||
return new HTableMultiplexerStatus(serverToFlushWorkerMap);
|
||||
}
|
||||
|
||||
private LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
|
||||
@VisibleForTesting
|
||||
LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
|
||||
FlushWorker worker = serverToFlushWorkerMap.get(addr);
|
||||
if (worker == null) {
|
||||
synchronized (this.serverToFlushWorkerMap) {
|
||||
|
@ -232,6 +272,11 @@ public class HTableMultiplexer {
|
|||
return worker.getQueue();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
ClusterConnection getConnection() {
|
||||
return this.conn;
|
||||
}
|
||||
|
||||
/**
|
||||
* HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer.
|
||||
* report the number of buffered requests and the number of the failed (dropped) requests
|
||||
|
@ -340,10 +385,11 @@ public class HTableMultiplexer {
|
|||
}
|
||||
}
|
||||
|
||||
private static class PutStatus {
|
||||
private final HRegionInfo regionInfo;
|
||||
private final Put put;
|
||||
private final int maxAttempCount;
|
||||
@VisibleForTesting
|
||||
static class PutStatus {
|
||||
final HRegionInfo regionInfo;
|
||||
final Put put;
|
||||
final int maxAttempCount;
|
||||
|
||||
public PutStatus(HRegionInfo regionInfo, Put put, int maxAttempCount) {
|
||||
this.regionInfo = regionInfo;
|
||||
|
@ -392,7 +438,8 @@ public class HTableMultiplexer {
|
|||
}
|
||||
}
|
||||
|
||||
private static class FlushWorker implements Runnable {
|
||||
@VisibleForTesting
|
||||
static class FlushWorker implements Runnable {
|
||||
private final HRegionLocation addr;
|
||||
private final LinkedBlockingQueue<PutStatus> queue;
|
||||
private final HTableMultiplexer multiplexer;
|
||||
|
@ -440,7 +487,7 @@ public class HTableMultiplexer {
|
|||
return this.maxLatency.getAndSet(0);
|
||||
}
|
||||
|
||||
private boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
|
||||
boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
|
||||
// Decrease the retry count
|
||||
final int retryCount = ps.maxAttempCount - 1;
|
||||
|
||||
|
@ -449,10 +496,10 @@ public class HTableMultiplexer {
|
|||
return false;
|
||||
}
|
||||
|
||||
int cnt = retryInQueue.incrementAndGet();
|
||||
if (cnt > maxRetryInQueue) {
|
||||
int cnt = getRetryInQueue().incrementAndGet();
|
||||
if (cnt > getMaxRetryInQueue()) {
|
||||
// Too many Puts in queue for resubmit, give up this
|
||||
retryInQueue.decrementAndGet();
|
||||
getRetryInQueue().decrementAndGet();
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -460,22 +507,21 @@ public class HTableMultiplexer {
|
|||
// The currentPut is failed. So get the table name for the currentPut.
|
||||
final TableName tableName = ps.regionInfo.getTable();
|
||||
|
||||
long delayMs = ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
|
||||
multiplexer.maxAttempts - retryCount - 1);
|
||||
long delayMs = getNextDelay(retryCount);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
|
||||
}
|
||||
|
||||
executor.schedule(new Runnable() {
|
||||
getExecutor().schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
boolean succ = false;
|
||||
try {
|
||||
succ = FlushWorker.this.multiplexer.put(tableName, failedPut, retryCount);
|
||||
succ = FlushWorker.this.getMultiplexer()._put(tableName, failedPut, retryCount, true);
|
||||
} finally {
|
||||
FlushWorker.this.retryInQueue.decrementAndGet();
|
||||
FlushWorker.this.getRetryInQueue().decrementAndGet();
|
||||
if (!succ) {
|
||||
FlushWorker.this.totalFailedPutCount.incrementAndGet();
|
||||
FlushWorker.this.getTotalFailedPutCount().incrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -483,6 +529,37 @@ public class HTableMultiplexer {
|
|||
return true;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
long getNextDelay(int retryCount) {
|
||||
return ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
|
||||
multiplexer.maxAttempts - retryCount - 1);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
AtomicInteger getRetryInQueue() {
|
||||
return this.retryInQueue;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
int getMaxRetryInQueue() {
|
||||
return this.maxRetryInQueue;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
AtomicLong getTotalFailedPutCount() {
|
||||
return this.totalFailedPutCount;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
HTableMultiplexer getMultiplexer() {
|
||||
return this.multiplexer;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
ScheduledExecutorService getExecutor() {
|
||||
return this.executor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
int failedCount = 0;
|
||||
|
|
|
@ -0,0 +1,193 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.HTableMultiplexer.FlushWorker;
|
||||
import org.apache.hadoop.hbase.client.HTableMultiplexer.PutStatus;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Mockito.doCallRealMethod;
|
||||
import static org.mockito.Mockito.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestHTableMultiplexerViaMocks {
|
||||
|
||||
private static final int NUM_RETRIES = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
|
||||
private HTableMultiplexer mockMultiplexer;
|
||||
private ClusterConnection mockConnection;
|
||||
private HRegionLocation mockRegionLocation;
|
||||
private HRegionInfo mockRegionInfo;
|
||||
|
||||
private TableName tableName;
|
||||
private Put put;
|
||||
|
||||
@Before
|
||||
public void setupTest() {
|
||||
mockMultiplexer = mock(HTableMultiplexer.class);
|
||||
mockConnection = mock(ClusterConnection.class);
|
||||
mockRegionLocation = mock(HRegionLocation.class);
|
||||
mockRegionInfo = mock(HRegionInfo.class);
|
||||
|
||||
tableName = TableName.valueOf("my_table");
|
||||
put = new Put(getBytes("row1"));
|
||||
put.addColumn(getBytes("f1"), getBytes("q1"), getBytes("v11"));
|
||||
put.addColumn(getBytes("f1"), getBytes("q2"), getBytes("v12"));
|
||||
put.addColumn(getBytes("f2"), getBytes("q1"), getBytes("v21"));
|
||||
|
||||
// Call the real put(TableName, Put, int) method
|
||||
when(mockMultiplexer.put(any(TableName.class), any(Put.class), anyInt())).thenCallRealMethod();
|
||||
|
||||
// Return the mocked ClusterConnection
|
||||
when(mockMultiplexer.getConnection()).thenReturn(mockConnection);
|
||||
|
||||
// Return the regionInfo from the region location
|
||||
when(mockRegionLocation.getRegionInfo()).thenReturn(mockRegionInfo);
|
||||
|
||||
// Make sure this RegionInfo points to our table
|
||||
when(mockRegionInfo.getTable()).thenReturn(tableName);
|
||||
}
|
||||
|
||||
@Test public void useCacheOnInitialPut() throws Exception {
|
||||
mockMultiplexer.put(tableName, put, NUM_RETRIES);
|
||||
|
||||
verify(mockMultiplexer)._put(tableName, put, NUM_RETRIES, false);
|
||||
}
|
||||
|
||||
@Test public void nonNullLocationQueuesPut() throws Exception {
|
||||
final LinkedBlockingQueue<PutStatus> queue = new LinkedBlockingQueue<>();
|
||||
|
||||
// Call the real method for _put(TableName, Put, int, boolean)
|
||||
when(mockMultiplexer._put(any(TableName.class), any(Put.class), anyInt(), anyBoolean())).thenCallRealMethod();
|
||||
|
||||
// Return a region location
|
||||
when(mockConnection.getRegionLocation(tableName, put.getRow(), false)).thenReturn(mockRegionLocation);
|
||||
when(mockMultiplexer.getQueue(mockRegionLocation)).thenReturn(queue);
|
||||
|
||||
assertTrue("Put should have been queued", mockMultiplexer.put(tableName, put, NUM_RETRIES));
|
||||
|
||||
assertEquals(1, queue.size());
|
||||
final PutStatus ps = queue.take();
|
||||
assertEquals(put, ps.put);
|
||||
assertEquals(mockRegionInfo, ps.regionInfo);
|
||||
}
|
||||
|
||||
@Test public void ignoreCacheOnRetriedPut() throws Exception {
|
||||
FlushWorker mockFlushWorker = mock(FlushWorker.class);
|
||||
ScheduledExecutorService mockExecutor = mock(ScheduledExecutorService.class);
|
||||
final AtomicInteger retryInQueue = new AtomicInteger(0);
|
||||
final AtomicLong totalFailedPuts = new AtomicLong(0L);
|
||||
final int maxRetryInQueue = 20;
|
||||
final long delay = 100L;
|
||||
|
||||
final PutStatus ps = new PutStatus(mockRegionInfo, put, NUM_RETRIES);
|
||||
|
||||
// Call the real resubmitFailedPut(PutStatus, HRegionLocation) method
|
||||
when(mockFlushWorker.resubmitFailedPut(any(PutStatus.class), any(HRegionLocation.class))).thenCallRealMethod();
|
||||
// Succeed on the re-submit without caching
|
||||
when(mockMultiplexer._put(tableName, put, NUM_RETRIES - 1, true)).thenReturn(true);
|
||||
|
||||
// Stub out the getters for resubmitFailedPut(PutStatus, HRegionLocation)
|
||||
when(mockFlushWorker.getExecutor()).thenReturn(mockExecutor);
|
||||
when(mockFlushWorker.getNextDelay(anyInt())).thenReturn(delay);
|
||||
when(mockFlushWorker.getMultiplexer()).thenReturn(mockMultiplexer);
|
||||
when(mockFlushWorker.getRetryInQueue()).thenReturn(retryInQueue);
|
||||
when(mockFlushWorker.getMaxRetryInQueue()).thenReturn(maxRetryInQueue);
|
||||
when(mockFlushWorker.getTotalFailedPutCount()).thenReturn(totalFailedPuts);
|
||||
|
||||
// When a Runnable is scheduled, run that Runnable
|
||||
when(mockExecutor.schedule(any(Runnable.class), eq(delay), eq(TimeUnit.MILLISECONDS))).thenAnswer(
|
||||
new Answer<Void>() {
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
// Before we run this, should have one retry in progress.
|
||||
assertEquals(1L, retryInQueue.get());
|
||||
|
||||
Object[] args = invocation.getArguments();
|
||||
assertEquals(3, args.length);
|
||||
assertTrue("Argument should be an instance of Runnable", args[0] instanceof Runnable);
|
||||
Runnable runnable = (Runnable) args[0];
|
||||
runnable.run();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
// The put should be rescheduled
|
||||
assertTrue("Put should have been rescheduled", mockFlushWorker.resubmitFailedPut(ps, mockRegionLocation));
|
||||
|
||||
verify(mockMultiplexer)._put(tableName, put, NUM_RETRIES - 1, true);
|
||||
assertEquals(0L, totalFailedPuts.get());
|
||||
// Net result should be zero (added one before rerunning, subtracted one after running).
|
||||
assertEquals(0L, retryInQueue.get());
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test public void testConnectionClosing() throws IOException {
|
||||
doCallRealMethod().when(mockMultiplexer).close();
|
||||
// If the connection is not closed
|
||||
when(mockConnection.isClosed()).thenReturn(false);
|
||||
|
||||
mockMultiplexer.close();
|
||||
|
||||
// We should close it
|
||||
verify(mockConnection).close();
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test public void testClosingAlreadyClosedConnection() throws IOException {
|
||||
doCallRealMethod().when(mockMultiplexer).close();
|
||||
// If the connection is already closed
|
||||
when(mockConnection.isClosed()).thenReturn(true);
|
||||
|
||||
mockMultiplexer.close();
|
||||
|
||||
// We should not close it again
|
||||
verify(mockConnection, times(0)).close();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return UTF-8 byte representation for {@code str}
|
||||
*/
|
||||
private static byte[] getBytes(String str) {
|
||||
return str.getBytes(UTF_8);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue