HBASE-12104 Some optimization and bugfix for HTableMultiplexer (Yi Deng)
This commit is contained in:
parent
3acdf06827
commit
bc4f25ff45
|
@ -28,23 +28,28 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
|
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
|
* HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
|
||||||
* Each put will be sharded into different buffer queues based on its destination region server.
|
* Each put will be sharded into different buffer queues based on its destination region server.
|
||||||
|
@ -63,24 +68,25 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class HTableMultiplexer {
|
public class HTableMultiplexer {
|
||||||
private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName());
|
private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName());
|
||||||
private static int poolID = 0;
|
|
||||||
|
|
||||||
static final String TABLE_MULTIPLEXER_FLUSH_FREQ_MS = "hbase.tablemultiplexer.flush.frequency.ms";
|
public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS =
|
||||||
|
"hbase.tablemultiplexer.flush.period.ms";
|
||||||
/** The map between each region server to its corresponding buffer queue */
|
public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads";
|
||||||
private final Map<HRegionLocation, LinkedBlockingQueue<PutStatus>> serverToBufferQueueMap =
|
public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE =
|
||||||
new ConcurrentHashMap<>();
|
"hbase.client.max.retries.in.queue";
|
||||||
|
|
||||||
/** The map between each region server to its flush worker */
|
/** The map between each region server to its flush worker */
|
||||||
private final Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap =
|
private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
|
||||||
new ConcurrentHashMap<>();
|
new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final ClusterConnection conn;
|
private final ClusterConnection conn;
|
||||||
private final ExecutorService pool;
|
private final ExecutorService pool;
|
||||||
private final int retryNum;
|
private final int retryNum;
|
||||||
private int perRegionServerBufferQueueSize;
|
private final int perRegionServerBufferQueueSize;
|
||||||
private final int maxKeyValueSize;
|
private final int maxKeyValueSize;
|
||||||
|
private final ScheduledExecutorService executor;
|
||||||
|
private final long flushPeriod;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param conf The HBaseConfiguration
|
* @param conf The HBaseConfiguration
|
||||||
|
@ -96,6 +102,11 @@ public class HTableMultiplexer {
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
|
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
|
||||||
this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
|
this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
|
||||||
this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
|
this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
|
||||||
|
this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100);
|
||||||
|
int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10);
|
||||||
|
this.executor =
|
||||||
|
Executors.newScheduledThreadPool(initThreads,
|
||||||
|
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -106,7 +117,7 @@ public class HTableMultiplexer {
|
||||||
* @return true if the request can be accepted by its corresponding buffer queue.
|
* @return true if the request can be accepted by its corresponding buffer queue.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public boolean put(TableName tableName, final Put put) throws IOException {
|
public boolean put(TableName tableName, final Put put) {
|
||||||
return put(tableName, put, this.retryNum);
|
return put(tableName, put, this.retryNum);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,8 +129,7 @@ public class HTableMultiplexer {
|
||||||
* @return the list of puts which could not be queued
|
* @return the list of puts which could not be queued
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public List<Put> put(TableName tableName, final List<Put> puts)
|
public List<Put> put(TableName tableName, final List<Put> puts) {
|
||||||
throws IOException {
|
|
||||||
if (puts == null)
|
if (puts == null)
|
||||||
return null;
|
return null;
|
||||||
|
|
||||||
|
@ -140,23 +150,22 @@ public class HTableMultiplexer {
|
||||||
return failedPuts;
|
return failedPuts;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<Put> put(byte[] tableName, final List<Put> puts) throws IOException {
|
/**
|
||||||
|
* Deprecated. Use {@link #put(TableName, List) } instead.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
public List<Put> put(byte[] tableName, final List<Put> puts) {
|
||||||
return put(TableName.valueOf(tableName), puts);
|
return put(TableName.valueOf(tableName), puts);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The put request will be buffered by its corresponding buffer queue. And the put request will be
|
* The put request will be buffered by its corresponding buffer queue. And the put request will be
|
||||||
* retried before dropping the request.
|
* retried before dropping the request.
|
||||||
* Return false if the queue is already full.
|
* Return false if the queue is already full.
|
||||||
* @param tableName
|
|
||||||
* @param put
|
|
||||||
* @param retry
|
|
||||||
* @return true if the request can be accepted by its corresponding buffer queue.
|
* @return true if the request can be accepted by its corresponding buffer queue.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public boolean put(final TableName tableName, final Put put, int retry)
|
public boolean put(final TableName tableName, final Put put, int retry) {
|
||||||
throws IOException {
|
|
||||||
if (retry <= 0) {
|
if (retry <= 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -166,24 +175,35 @@ public class HTableMultiplexer {
|
||||||
HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
|
HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
|
||||||
if (loc != null) {
|
if (loc != null) {
|
||||||
// Add the put pair into its corresponding queue.
|
// Add the put pair into its corresponding queue.
|
||||||
|
|
||||||
LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
|
LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
|
||||||
|
|
||||||
// Generate a MultiPutStatus object and offer it into the queue
|
// Generate a MultiPutStatus object and offer it into the queue
|
||||||
PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry);
|
PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry);
|
||||||
|
|
||||||
return queue.offer(s);
|
return queue.offer(s);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (IOException e) {
|
||||||
LOG.debug("Cannot process the put " + put + " because of " + e);
|
LOG.debug("Cannot process the put " + put, e);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean put(final byte[] tableName, final Put put, int retry)
|
/**
|
||||||
throws IOException {
|
* Deprecated. Use {@link #put(TableName, Put) } instead.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
public boolean put(final byte[] tableName, final Put put, int retry) {
|
||||||
return put(TableName.valueOf(tableName), put, retry);
|
return put(TableName.valueOf(tableName), put, retry);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deprecated. Use {@link #put(TableName, Put)} instead.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
public boolean put(final byte[] tableName, Put put) {
|
||||||
|
return put(TableName.valueOf(tableName), put);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the current HTableMultiplexerStatus
|
* @return the current HTableMultiplexerStatus
|
||||||
*/
|
*/
|
||||||
|
@ -192,30 +212,20 @@ public class HTableMultiplexer {
|
||||||
}
|
}
|
||||||
|
|
||||||
private LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
|
private LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
|
||||||
LinkedBlockingQueue<PutStatus> queue = serverToBufferQueueMap.get(addr);
|
FlushWorker worker = serverToFlushWorkerMap.get(addr);
|
||||||
if (queue == null) {
|
if (worker == null) {
|
||||||
synchronized (this.serverToBufferQueueMap) {
|
synchronized (this.serverToFlushWorkerMap) {
|
||||||
queue = serverToBufferQueueMap.get(addr);
|
worker = serverToFlushWorkerMap.get(addr);
|
||||||
if (queue == null) {
|
if (worker == null) {
|
||||||
// Create a queue for the new region server
|
|
||||||
queue = new LinkedBlockingQueue<PutStatus>(perRegionServerBufferQueueSize);
|
|
||||||
serverToBufferQueueMap.put(addr, queue);
|
|
||||||
|
|
||||||
// Create the flush worker
|
// Create the flush worker
|
||||||
HTableFlushWorker worker =
|
worker = new FlushWorker(conf, this.conn, addr, this, perRegionServerBufferQueueSize,
|
||||||
new HTableFlushWorker(conf, this.conn, addr, this, queue, pool);
|
pool, executor);
|
||||||
this.serverToFlushWorkerMap.put(addr, worker);
|
this.serverToFlushWorkerMap.put(addr, worker);
|
||||||
|
executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS);
|
||||||
// Launch a daemon thread to flush the puts
|
|
||||||
// from the queue to its corresponding region server.
|
|
||||||
String name = "HTableFlushWorker-" + addr.getHostnamePort() + "-" + (poolID++);
|
|
||||||
Thread t = new Thread(worker, name);
|
|
||||||
t.setDaemon(true);
|
|
||||||
t.start();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return queue;
|
return worker.getQueue();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -223,7 +233,7 @@ public class HTableMultiplexer {
|
||||||
* report the number of buffered requests and the number of the failed (dropped) requests
|
* report the number of buffered requests and the number of the failed (dropped) requests
|
||||||
* in total or on per region server basis.
|
* in total or on per region server basis.
|
||||||
*/
|
*/
|
||||||
static class HTableMultiplexerStatus {
|
public static class HTableMultiplexerStatus {
|
||||||
private long totalFailedPutCounter;
|
private long totalFailedPutCounter;
|
||||||
private long totalBufferedPutCounter;
|
private long totalBufferedPutCounter;
|
||||||
private long maxLatency;
|
private long maxLatency;
|
||||||
|
@ -234,7 +244,7 @@ public class HTableMultiplexer {
|
||||||
private Map<String, Long> serverToMaxLatencyMap;
|
private Map<String, Long> serverToMaxLatencyMap;
|
||||||
|
|
||||||
public HTableMultiplexerStatus(
|
public HTableMultiplexerStatus(
|
||||||
Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap) {
|
Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
|
||||||
this.totalBufferedPutCounter = 0;
|
this.totalBufferedPutCounter = 0;
|
||||||
this.totalFailedPutCounter = 0;
|
this.totalFailedPutCounter = 0;
|
||||||
this.maxLatency = 0;
|
this.maxLatency = 0;
|
||||||
|
@ -247,17 +257,17 @@ public class HTableMultiplexer {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initialize(
|
private void initialize(
|
||||||
Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap) {
|
Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
|
||||||
if (serverToFlushWorkerMap == null) {
|
if (serverToFlushWorkerMap == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
long averageCalcSum = 0;
|
long averageCalcSum = 0;
|
||||||
int averageCalcCount = 0;
|
int averageCalcCount = 0;
|
||||||
for (Map.Entry<HRegionLocation, HTableFlushWorker> entry : serverToFlushWorkerMap
|
for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap
|
||||||
.entrySet()) {
|
.entrySet()) {
|
||||||
HRegionLocation addr = entry.getKey();
|
HRegionLocation addr = entry.getKey();
|
||||||
HTableFlushWorker worker = entry.getValue();
|
FlushWorker worker = entry.getValue();
|
||||||
|
|
||||||
long bufferedCounter = worker.getTotalBufferedCount();
|
long bufferedCounter = worker.getTotalBufferedCount();
|
||||||
long failedCounter = worker.getTotalFailedCount();
|
long failedCounter = worker.getTotalFailedCount();
|
||||||
|
@ -325,25 +335,15 @@ public class HTableMultiplexer {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class PutStatus {
|
private static class PutStatus {
|
||||||
private final HRegionInfo regionInfo;
|
public final HRegionInfo regionInfo;
|
||||||
private final Put put;
|
public final Put put;
|
||||||
private final int retryCount;
|
public final int retryCount;
|
||||||
public PutStatus(final HRegionInfo regionInfo, final Put put,
|
|
||||||
final int retryCount) {
|
public PutStatus(HRegionInfo regionInfo, Put put, int retryCount) {
|
||||||
this.regionInfo = regionInfo;
|
this.regionInfo = regionInfo;
|
||||||
this.put = put;
|
this.put = put;
|
||||||
this.retryCount = retryCount;
|
this.retryCount = retryCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
public HRegionInfo getRegionInfo() {
|
|
||||||
return regionInfo;
|
|
||||||
}
|
|
||||||
public Put getPut() {
|
|
||||||
return put;
|
|
||||||
}
|
|
||||||
public int getRetryCount() {
|
|
||||||
return retryCount;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -386,26 +386,38 @@ public class HTableMultiplexer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class HTableFlushWorker implements Runnable {
|
private static class FlushWorker implements Runnable {
|
||||||
private final HRegionLocation addr;
|
private final HRegionLocation addr;
|
||||||
private final Configuration conf;
|
private final AsyncProcess asyncProc;
|
||||||
private final ClusterConnection conn;
|
|
||||||
private final LinkedBlockingQueue<PutStatus> queue;
|
private final LinkedBlockingQueue<PutStatus> queue;
|
||||||
private final HTableMultiplexer htableMultiplexer;
|
private final HTableMultiplexer multiplexer;
|
||||||
private final AtomicLong totalFailedPutCount = new AtomicLong(0);
|
private final AtomicLong totalFailedPutCount = new AtomicLong(0);
|
||||||
private final AtomicInteger currentProcessingPutCount = new AtomicInteger(0);
|
private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
|
||||||
private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
|
private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
|
||||||
private final AtomicLong maxLatency = new AtomicLong(0);
|
private final AtomicLong maxLatency = new AtomicLong(0);
|
||||||
private final ExecutorService pool;
|
private final ExecutorService pool;
|
||||||
|
private final List<PutStatus> processingList = new ArrayList<>();
|
||||||
|
private final ScheduledExecutorService executor;
|
||||||
|
private final int maxRetryInQueue;
|
||||||
|
private final AtomicInteger retryInQueue = new AtomicInteger(0);
|
||||||
|
private final int rpcTimeOutMs;
|
||||||
|
|
||||||
public HTableFlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
|
public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
|
||||||
HTableMultiplexer htableMultiplexer, LinkedBlockingQueue<PutStatus> queue, ExecutorService pool) {
|
HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
|
||||||
|
ExecutorService pool, ScheduledExecutorService executor) {
|
||||||
this.addr = addr;
|
this.addr = addr;
|
||||||
this.conf = conf;
|
this.asyncProc = conn.getAsyncProcess();
|
||||||
this.conn = conn;
|
this.multiplexer = htableMultiplexer;
|
||||||
this.htableMultiplexer = htableMultiplexer;
|
this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
|
||||||
this.queue = queue;
|
|
||||||
this.pool = pool;
|
this.pool = pool;
|
||||||
|
this.executor = executor;
|
||||||
|
this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
|
||||||
|
this.rpcTimeOutMs =
|
||||||
|
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected LinkedBlockingQueue<PutStatus> getQueue() {
|
||||||
|
return this.queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getTotalFailedCount() {
|
public long getTotalFailedCount() {
|
||||||
|
@ -413,7 +425,7 @@ public class HTableMultiplexer {
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getTotalBufferedCount() {
|
public long getTotalBufferedCount() {
|
||||||
return queue.size() + currentProcessingPutCount.get();
|
return queue.size() + currentProcessingCount.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
public AtomicAverageCounter getAverageLatencyCounter() {
|
public AtomicAverageCounter getAverageLatencyCounter() {
|
||||||
|
@ -424,149 +436,146 @@ public class HTableMultiplexer {
|
||||||
return this.maxLatency.getAndSet(0);
|
return this.maxLatency.getAndSet(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean resubmitFailedPut(PutStatus failedPutStatus,
|
private boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
|
||||||
HRegionLocation oldLoc) throws IOException {
|
|
||||||
Put failedPut = failedPutStatus.getPut();
|
|
||||||
// The currentPut is failed. So get the table name for the currentPut.
|
|
||||||
TableName tableName = failedPutStatus.getRegionInfo().getTable();
|
|
||||||
// Decrease the retry count
|
// Decrease the retry count
|
||||||
int retryCount = failedPutStatus.getRetryCount() - 1;
|
final int retryCount = ps.retryCount - 1;
|
||||||
|
|
||||||
if (retryCount <= 0) {
|
if (retryCount <= 0) {
|
||||||
// Update the failed counter and no retry any more.
|
// Update the failed counter and no retry any more.
|
||||||
return false;
|
return false;
|
||||||
} else {
|
|
||||||
// Retry one more time
|
|
||||||
return this.htableMultiplexer.put(tableName, failedPut, retryCount);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int cnt = retryInQueue.incrementAndGet();
|
||||||
|
if (cnt > maxRetryInQueue) {
|
||||||
|
// Too many Puts in queue for resubmit, give up this
|
||||||
|
retryInQueue.decrementAndGet();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
final Put failedPut = ps.put;
|
||||||
|
// The currentPut is failed. So get the table name for the currentPut.
|
||||||
|
final TableName tableName = ps.regionInfo.getTable();
|
||||||
|
|
||||||
|
// Wait at least RPC timeout time
|
||||||
|
long delayMs = rpcTimeOutMs;
|
||||||
|
delayMs = Math.max(delayMs, (long) (multiplexer.flushPeriod * Math.pow(2,
|
||||||
|
multiplexer.retryNum - retryCount)));
|
||||||
|
|
||||||
|
executor.schedule(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
boolean succ = false;
|
||||||
|
try {
|
||||||
|
succ = FlushWorker.this.multiplexer.put(tableName, failedPut, retryCount);
|
||||||
|
} finally {
|
||||||
|
FlushWorker.this.retryInQueue.decrementAndGet();
|
||||||
|
if (!succ) {
|
||||||
|
FlushWorker.this.totalFailedPutCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, delayMs, TimeUnit.MILLISECONDS);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings
|
|
||||||
(value = "REC_CATCH_EXCEPTION", justification = "na")
|
|
||||||
public void run() {
|
public void run() {
|
||||||
List<PutStatus> processingList = new ArrayList<>();
|
|
||||||
/**
|
|
||||||
* The frequency in milliseconds for the current thread to process the corresponding
|
|
||||||
* buffer queue.
|
|
||||||
**/
|
|
||||||
long frequency = conf.getLong(TABLE_MULTIPLEXER_FLUSH_FREQ_MS, 100);
|
|
||||||
|
|
||||||
// initial delay
|
|
||||||
try {
|
|
||||||
Thread.sleep(frequency);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.warn("Interrupted while sleeping");
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncProcess ap = conn.getAsyncProcess();
|
|
||||||
|
|
||||||
long start, elapsed;
|
|
||||||
int failedCount = 0;
|
int failedCount = 0;
|
||||||
while (true) {
|
try {
|
||||||
try {
|
long start = EnvironmentEdgeManager.currentTime();
|
||||||
start = elapsed = EnvironmentEdgeManager.currentTime();
|
|
||||||
|
|
||||||
// Clear the processingList, putToStatusMap and failedCount
|
// drain all the queued puts into the tmp list
|
||||||
processingList.clear();
|
processingList.clear();
|
||||||
failedCount = 0;
|
queue.drainTo(processingList);
|
||||||
|
if (processingList.size() == 0) {
|
||||||
// drain all the queued puts into the tmp list
|
// Nothing to flush
|
||||||
queue.drainTo(processingList);
|
return;
|
||||||
currentProcessingPutCount.set(processingList.size());
|
|
||||||
|
|
||||||
if (processingList.size() > 0) {
|
|
||||||
List<Action<Row>> retainedActions = new ArrayList<>(processingList.size());
|
|
||||||
MultiAction<Row> actions = new MultiAction<>();
|
|
||||||
for (int i = 0; i < processingList.size(); i++) {
|
|
||||||
PutStatus putStatus = processingList.get(i);
|
|
||||||
Action<Row> action = new Action<Row>(putStatus.getPut(), i);
|
|
||||||
actions.add(putStatus.getRegionInfo().getRegionName(), action);
|
|
||||||
retainedActions.add(action);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process this multi-put request
|
|
||||||
List<PutStatus> failed = null;
|
|
||||||
Object[] results = new Object[actions.size()];
|
|
||||||
ServerName server = addr.getServerName();
|
|
||||||
Map<ServerName, MultiAction<Row>> actionsByServer =
|
|
||||||
Collections.singletonMap(server, actions);
|
|
||||||
try {
|
|
||||||
AsyncRequestFuture arf =
|
|
||||||
ap.submitMultiActions(null, retainedActions, 0L, null, results,
|
|
||||||
true, null, null, actionsByServer, pool);
|
|
||||||
arf.waitUntilDone();
|
|
||||||
if (arf.hasError()) {
|
|
||||||
throw arf.getErrors();
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.debug("Caught some exceptions " + e
|
|
||||||
+ " when flushing puts to region server " + addr.getHostnamePort());
|
|
||||||
} finally {
|
|
||||||
// mutate list so that it is empty for complete success, or
|
|
||||||
// contains only failed records
|
|
||||||
// results are returned in the same order as the requests in list
|
|
||||||
// walk the list backwards, so we can remove from list without
|
|
||||||
// impacting the indexes of earlier members
|
|
||||||
for (int i = 0; i < results.length; i++) {
|
|
||||||
if (results[i] == null) {
|
|
||||||
if (failed == null) {
|
|
||||||
failed = new ArrayList<PutStatus>();
|
|
||||||
}
|
|
||||||
failed.add(processingList.get(i));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (failed != null) {
|
|
||||||
// Resubmit failed puts
|
|
||||||
for (PutStatus putStatus : processingList) {
|
|
||||||
if (!resubmitFailedPut(putStatus, this.addr)) {
|
|
||||||
failedCount++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Update the totalFailedCount
|
|
||||||
this.totalFailedPutCount.addAndGet(failedCount);
|
|
||||||
}
|
|
||||||
|
|
||||||
elapsed = EnvironmentEdgeManager.currentTime() - start;
|
|
||||||
// Update latency counters
|
|
||||||
averageLatency.add(elapsed);
|
|
||||||
if (elapsed > maxLatency.get()) {
|
|
||||||
maxLatency.set(elapsed);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Log some basic info
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Processed " + currentProcessingPutCount
|
|
||||||
+ " put requests for " + addr.getHostnamePort() + " and "
|
|
||||||
+ failedCount + " failed" + ", latency for this send: "
|
|
||||||
+ elapsed);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reset the current processing put count
|
|
||||||
currentProcessingPutCount.set(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sleep for a while
|
|
||||||
if (elapsed == start) {
|
|
||||||
elapsed = EnvironmentEdgeManager.currentTime() - start;
|
|
||||||
}
|
|
||||||
if (elapsed < frequency) {
|
|
||||||
try {
|
|
||||||
Thread.sleep(frequency - elapsed);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.warn("Interrupted while sleeping");
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
// Log all the exceptions and move on
|
|
||||||
LOG.debug("Caught some exceptions " + e
|
|
||||||
+ " when flushing puts to region server "
|
|
||||||
+ addr.getHostnamePort(), e);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
currentProcessingCount.set(processingList.size());
|
||||||
|
// failedCount is decreased whenever a Put is success or resubmit.
|
||||||
|
failedCount = processingList.size();
|
||||||
|
|
||||||
|
List<Action<Row>> retainedActions = new ArrayList<>(processingList.size());
|
||||||
|
MultiAction<Row> actions = new MultiAction<>();
|
||||||
|
for (int i = 0; i < processingList.size(); i++) {
|
||||||
|
PutStatus putStatus = processingList.get(i);
|
||||||
|
Action<Row> action = new Action<Row>(putStatus.put, i);
|
||||||
|
actions.add(putStatus.regionInfo.getRegionName(), action);
|
||||||
|
retainedActions.add(action);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process this multi-put request
|
||||||
|
List<PutStatus> failed = null;
|
||||||
|
Object[] results = new Object[actions.size()];
|
||||||
|
ServerName server = addr.getServerName();
|
||||||
|
Map<ServerName, MultiAction<Row>> actionsByServer =
|
||||||
|
Collections.singletonMap(server, actions);
|
||||||
|
try {
|
||||||
|
AsyncRequestFuture arf =
|
||||||
|
asyncProc.submitMultiActions(null, retainedActions, 0L, null, results, true, null,
|
||||||
|
null, actionsByServer, pool);
|
||||||
|
arf.waitUntilDone();
|
||||||
|
if (arf.hasError()) {
|
||||||
|
// We just log and ignore the exception here since failed Puts will be resubmit again.
|
||||||
|
LOG.debug("Caught some exceptions when flushing puts to region server "
|
||||||
|
+ addr.getHostnamePort(), arf.getErrors());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
for (int i = 0; i < results.length; i++) {
|
||||||
|
if (results[i] == null) {
|
||||||
|
if (failed == null) {
|
||||||
|
failed = new ArrayList<PutStatus>();
|
||||||
|
}
|
||||||
|
failed.add(processingList.get(i));
|
||||||
|
} else {
|
||||||
|
failedCount--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (failed != null) {
|
||||||
|
// Resubmit failed puts
|
||||||
|
for (PutStatus putStatus : processingList) {
|
||||||
|
if (resubmitFailedPut(putStatus, this.addr)) {
|
||||||
|
failedCount--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
long elapsed = EnvironmentEdgeManager.currentTime() - start;
|
||||||
|
// Update latency counters
|
||||||
|
averageLatency.add(elapsed);
|
||||||
|
if (elapsed > maxLatency.get()) {
|
||||||
|
maxLatency.set(elapsed);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Log some basic info
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Processed " + currentProcessingCount + " put requests for "
|
||||||
|
+ addr.getHostnamePort() + " and " + failedCount + " failed"
|
||||||
|
+ ", latency for this send: " + elapsed);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset the current processing put count
|
||||||
|
currentProcessingCount.set(0);
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
// To make findbugs happy
|
||||||
|
// Log all the exceptions and move on
|
||||||
|
LOG.debug(
|
||||||
|
"Caught some exceptions " + e + " when flushing puts to region server "
|
||||||
|
+ addr.getHostnamePort(), e);
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (e instanceof InterruptedException) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
// Log all the exceptions and move on
|
||||||
|
LOG.debug(
|
||||||
|
"Caught some exceptions " + e + " when flushing puts to region server "
|
||||||
|
+ addr.getHostnamePort(), e);
|
||||||
|
} finally {
|
||||||
|
// Update the totalFailedCount
|
||||||
|
this.totalFailedPutCount.addAndGet(failedCount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue