HBASE-12988 [Replication]Parallel apply edits across regions.

This commit is contained in:
Lars Hofhansl 2015-09-04 13:22:14 -07:00
parent de0e72f464
commit 6a8ba22c16
3 changed files with 120 additions and 15 deletions

View File

@ -1146,6 +1146,12 @@ public final class HConstants {
/** Configuration key for setting replication codec class name */
public static final String REPLICATION_CODEC_CONF_KEY = "hbase.replication.rpc.codec";
/** Maximum number of threads used by the replication source for shipping edits to the sinks */
public static final String REPLICATION_SOURCE_MAXTHREADS_KEY =
"hbase.replication.source.maxthreads";
public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10;
/** Config for pluggable consensus provider */
public static final String HBASE_COORDINATED_STATE_MANAGER_CLASS =
"hbase.coordinated.state.manager.class";

View File

@ -1537,6 +1537,17 @@ possible configurations would overwhelm and obscure the important.
using KeyValueCodecWithTags for replication when there are no tags causes no harm.
</description>
</property>
<property>
<name>hbase.replication.source.maxthreads</name>
<value>10</value>
<description>
The maximum number of threads any replication source will use for
shipping edits to the sinks in parallel. This also limits the number of
chunks each replication batch is broken into.
Larger values can improve the replication throughput between the master and
slave clusters. The default of 10 will rarely need to be changed.
</description>
</property>
<!-- Static Web User Filter properties. -->
<property>
<description>

View File

@ -21,7 +21,15 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -34,6 +42,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
@ -71,6 +80,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
// Handles connecting to peer region servers
private ReplicationSinkManager replicationSinkMgr;
private boolean peersSelected = false;
private ThreadPoolExecutor exec;
private int maxThreads;
@Override
public void init(Context context) throws IOException {
@ -89,6 +100,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
this.metrics = context.getMetrics();
// ReplicationQueueInfo parses the peerId out of the znode for us
this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
// per sink thread pool
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
this.exec = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
private void decorateConf() {
@ -139,32 +155,71 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
public boolean replicate(ReplicateContext replicateContext) {
List<Entry> entries = replicateContext.getEntries();
int sleepMultiplier = 1;
while (this.isRunning()) {
if (!peersSelected) {
connectToPeers();
peersSelected = true;
}
if (!peersSelected && this.isRunning()) {
connectToPeers();
peersSelected = true;
}
// minimum of: configured threads, number of 100-waledit batches,
// and number of current sinks
int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1),
replicationSinkMgr.getSinks().size());
List<List<Entry>> entryLists = new ArrayList<List<Entry>>(n);
if (n == 1) {
entryLists.add(entries);
} else {
for (int i=0; i<n; i++) {
entryLists.add(new ArrayList<Entry>(entries.size()/n+1));
}
// now group by region
for (Entry e : entries) {
entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e);
}
}
while (this.isRunning()) {
if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
SinkPeer sinkPeer = null;
try {
sinkPeer = replicationSinkMgr.getReplicationSink();
BlockingInterface rrs = sinkPeer.getRegionServer();
if (LOG.isTraceEnabled()) {
LOG.trace("Replicating " + entries.size() +
" entries of total size " + replicateContext.getSize());
}
ReplicationProtbufUtil.replicateWALEntry(rrs,
entries.toArray(new Entry[entries.size()]));
List<Future<Integer>> futures = new ArrayList<Future<Integer>>(entryLists.size());
for (int i=0; i<entryLists.size(); i++) {
if (!entryLists.get(i).isEmpty()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Submitting " + entryLists.get(i).size() +
" entries of total size " + replicateContext.getSize());
}
// RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
futures.add(exec.submit(new Replicator(entryLists.get(i), i)));
}
}
IOException iox = null;
for (Future<Integer> f : futures) {
try {
// wait for all futures, remove successful parts
// (only the remaining parts will be retried)
entryLists.remove(f.get());
} catch (InterruptedException ie) {
iox = new IOException(ie);
} catch (ExecutionException ee) {
// cause must be an IOException
iox = (IOException)ee.getCause();
}
}
if (iox != null) {
// if we had any exceptions, try again
throw iox;
}
// update metrics
this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
replicationSinkMgr.reportSinkSuccess(sinkPeer);
return true;
} catch (IOException ioe) {
@ -195,10 +250,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
LOG.warn("Can't replicate because of a local or network error: ", ioe);
}
}
if (sinkPeer != null) {
replicationSinkMgr.reportBadSink(sinkPeer);
}
if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
sleepMultiplier++;
}
@ -222,6 +273,43 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
LOG.warn("Failed to close the connection");
}
}
exec.shutdownNow();
notifyStopped();
}
// is this needed? Nobody else will call doStop() otherwise
@Override
public State stopAndWait() {
doStop();
return super.stopAndWait();
}
private class Replicator implements Callable<Integer> {
private List<Entry> entries;
private int ordinal;
public Replicator(List<Entry> entries, int ordinal) {
this.entries = entries;
this.ordinal = ordinal;
}
@Override
public Integer call() throws IOException {
SinkPeer sinkPeer = null;
try {
sinkPeer = replicationSinkMgr.getReplicationSink();
BlockingInterface rrs = sinkPeer.getRegionServer();
ReplicationProtbufUtil.replicateWALEntry(rrs,
entries.toArray(new Entry[entries.size()]));
replicationSinkMgr.reportSinkSuccess(sinkPeer);
return ordinal;
} catch (IOException ioe) {
if (sinkPeer != null) {
replicationSinkMgr.reportBadSink(sinkPeer);
}
throw ioe;
}
}
}
}