HBASE-21538 Rewrite RegionReplicaFlushHandler to use AsyncClusterConnection

This commit is contained in:
Duo Zhang 2018-12-12 09:33:33 +08:00 committed by zhangduo
parent 04f737d9bd
commit 7593e86c5f
7 changed files with 106 additions and 90 deletions

View File

@ -17,10 +17,13 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
/** /**
* The asynchronous connection for internal usage. * The asynchronous connection for internal usage.
*/ */
@ -41,4 +44,9 @@ public interface AsyncClusterConnection extends AsyncConnection {
* Get the rpc client we used to communicate with other servers. * Get the rpc client we used to communicate with other servers.
*/ */
RpcClient getRpcClient(); RpcClient getRpcClient();
/**
* Flush a region and get the response.
*/
CompletableFuture<FlushRegionResponse> flush(byte[] regionName, boolean writeFlushWALMarker);
} }

View File

@ -60,6 +60,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
@ -384,4 +385,11 @@ class AsyncConnectionImpl implements AsyncClusterConnection {
public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) { public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
return new AsyncRegionServerAdmin(serverName, this); return new AsyncRegionServerAdmin(serverName, this);
} }
@Override
public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
boolean writeFlushWALMarker) {
RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
return admin.flushRegionInternal(regionName, writeFlushWALMarker);
}
} }

View File

@ -18,15 +18,12 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
/** /**
* The factory for creating {@link AsyncClusterConnection}. * The factory for creating {@link AsyncClusterConnection}.
*/ */
@ -48,16 +45,7 @@ public final class ClusterConnectionFactory {
public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf, public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf,
SocketAddress localAddress, User user) throws IOException { SocketAddress localAddress, User user) throws IOException {
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf); AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
String clusterId; String clusterId = FutureUtils.get(registry.getClusterId());
try {
clusterId = registry.getClusterId().get();
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException().initCause(e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
Throwables.propagateIfPossible(cause, IOException.class);
throw new IOException(cause);
}
return new AsyncConnectionImpl(conf, registry, clusterId, localAddress, user); return new AsyncConnectionImpl(conf, registry, clusterId, localAddress, user);
} }
} }

View File

@ -910,7 +910,19 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override @Override
public CompletableFuture<Void> flushRegion(byte[] regionName) { public CompletableFuture<Void> flushRegion(byte[] regionName) {
CompletableFuture<Void> future = new CompletableFuture<>(); return flushRegionInternal(regionName, false).thenAccept(r -> {
});
}
/**
* This method is for internal use only, where we need the response of the flush.
* <p/>
* As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
* API.
*/
CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName,
boolean writeFlushWALMarker) {
CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
addListener(getRegionLocation(regionName), (location, err) -> { addListener(getRegionLocation(regionName), (location, err) -> {
if (err != null) { if (err != null) {
future.completeExceptionally(err); future.completeExceptionally(err);
@ -922,7 +934,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
.completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
return; return;
} }
addListener(flush(serverName, location.getRegion()), (ret, err2) -> { addListener(flush(serverName, location.getRegion(), writeFlushWALMarker), (ret, err2) -> {
if (err2 != null) { if (err2 != null) {
future.completeExceptionally(err2); future.completeExceptionally(err2);
} else { } else {
@ -933,15 +945,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return future; return future;
} }
private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo) { private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
return this.<Void> newAdminCaller() boolean writeFlushWALMarker) {
.serverName(serverName) return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
.action( .action((controller, stub) -> this
(controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall( .<FlushRegionRequest, FlushRegionResponse, FlushRegionResponse> adminCall(controller, stub,
controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), writeFlushWALMarker),
.getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done), (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
resp -> null)) .call();
.call();
} }
@Override @Override
@ -954,7 +965,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
} }
List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
if (hRegionInfos != null) { if (hRegionInfos != null) {
hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region))); hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, false).thenAccept(r -> {
})));
} }
addListener(CompletableFuture.allOf( addListener(CompletableFuture.allOf(
compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {

View File

@ -18,13 +18,10 @@
*/ */
package org.apache.hadoop.hbase.protobuf; package org.apache.hadoop.hbase.protobuf;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
@ -32,12 +29,12 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.io.SizedCellScanner; import org.apache.hadoop.hbase.io.SizedCellScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
@ -60,15 +57,7 @@ public class ReplicationProtbufUtil {
throws IOException { throws IOException {
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest( Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(
entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir); entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
try { FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond()));
admin.replicateWALEntry(p.getFirst(), p.getSecond()).get();
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException().initCause(e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
Throwables.propagateIfPossible(cause, IOException.class);
throw new IOException(e);
}
} }
/** /**

View File

@ -2409,8 +2409,7 @@ public class HRegionServer extends HasThread implements
// submit it to be handled by one of the handlers so that we do not block OpenRegionHandler // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
if (this.executorService != null) { if (this.executorService != null) {
this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection, this.executorService.submit(new RegionReplicaFlushHandler(this, region));
rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
} }
} }

View File

@ -20,26 +20,23 @@ package org.apache.hadoop.hbase.regionserver.handler;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.FlushRegionCallable;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
/** /**
* HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to wal in * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to wal in
@ -56,20 +53,13 @@ public class RegionReplicaFlushHandler extends EventHandler {
private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class); private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class);
private final ClusterConnection connection; private final AsyncClusterConnection connection;
private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
private final RpcControllerFactory rpcControllerFactory;
private final int operationTimeout;
private final HRegion region; private final HRegion region;
public RegionReplicaFlushHandler(Server server, ClusterConnection connection, public RegionReplicaFlushHandler(Server server, HRegion region) {
RpcRetryingCallerFactory rpcRetryingCallerFactory, RpcControllerFactory rpcControllerFactory,
int operationTimeout, HRegion region) {
super(server, EventType.RS_REGION_REPLICA_FLUSH); super(server, EventType.RS_REGION_REPLICA_FLUSH);
this.connection = connection; this.connection = server.getAsyncClusterConnection();
this.rpcRetryingCallerFactory = rpcRetryingCallerFactory;
this.rpcControllerFactory = rpcControllerFactory;
this.operationTimeout = operationTimeout;
this.region = region; this.region = region;
} }
@ -103,7 +93,7 @@ public class RegionReplicaFlushHandler extends EventHandler {
return numRetries; return numRetries;
} }
void triggerFlushInPrimaryRegion(final HRegion region) throws IOException, RuntimeException { void triggerFlushInPrimaryRegion(final HRegion region) throws IOException {
long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE); HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
@ -117,45 +107,59 @@ public class RegionReplicaFlushHandler extends EventHandler {
} }
while (!region.isClosing() && !region.isClosed() while (!region.isClosing() && !region.isClosed()
&& !server.isAborted() && !server.isStopped()) { && !server.isAborted() && !server.isStopped()) {
FlushRegionCallable flushCallable = new FlushRegionCallable(
connection, rpcControllerFactory,
RegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()), true);
// TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we // TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we
// do not have to wait for the whole flush here, just initiate it. // do not have to wait for the whole flush here, just initiate it.
FlushRegionResponse response = null; FlushRegionResponse response;
try { try {
response = rpcRetryingCallerFactory.<FlushRegionResponse>newCaller() response = FutureUtils.get(connection.flush(ServerRegionReplicaUtil
.callWithRetries(flushCallable, this.operationTimeout); .getRegionInfoForDefaultReplica(region.getRegionInfo()).getRegionName(), true));
} catch (IOException ex) { } catch (IOException e) {
if (ex instanceof TableNotFoundException if (e instanceof TableNotFoundException || FutureUtils
|| connection.isTableDisabled(region.getRegionInfo().getTable())) { .get(connection.getAdmin().isTableDisabled(region.getRegionInfo().getTable()))) {
return; return;
} }
throw ex; if (!counter.shouldRetry()) {
throw e;
}
// The reason that why we need to retry here is that, the retry for asynchronous admin
// request is much simpler than the normal operation, if we failed to locate the region once
// then we will throw the exception out and will not try to relocate again. So here we need
// to add some retries by ourselves to prevent shutting down the region server too
// frequent...
LOG.debug("Failed to trigger a flush of primary region replica {} of region {}, retry={}",
ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
.getRegionNameAsString(),
region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes(), e);
try {
counter.sleepUntilNextRetry();
} catch (InterruptedException e1) {
throw new InterruptedIOException(e1.getMessage());
}
continue;
} }
if (response.getFlushed()) { if (response.getFlushed()) {
// then we have to wait for seeing the flush entry. All reads will be rejected until we see // then we have to wait for seeing the flush entry. All reads will be rejected until we see
// a complete flush cycle or replay a region open event // a complete flush cycle or replay a region open event
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Successfully triggered a flush of primary region replica " LOG.debug("Successfully triggered a flush of primary region replica " +
+ ServerRegionReplicaUtil ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
.getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() .getRegionNameAsString() +
+ " of region " + region.getRegionInfo().getEncodedName() " of region " + region.getRegionInfo().getRegionNameAsString() +
+ " Now waiting and blocking reads until observing a full flush cycle"); " Now waiting and blocking reads until observing a full flush cycle");
} }
region.setReadsEnabled(true); region.setReadsEnabled(true);
break; break;
} else { } else {
if (response.hasWroteFlushWalMarker()) { if (response.hasWroteFlushWalMarker()) {
if(response.getWroteFlushWalMarker()) { if (response.getWroteFlushWalMarker()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary " LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary " +
+ "region replica " + ServerRegionReplicaUtil "region replica " +
.getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
+ " of region " + region.getRegionInfo().getEncodedName() + " Now waiting and " .getRegionNameAsString() +
+ "blocking reads until observing a flush marker"); " of region " + region.getRegionInfo().getRegionNameAsString() +
" Now waiting and " + "blocking reads until observing a flush marker");
} }
region.setReadsEnabled(true); region.setReadsEnabled(true);
break; break;
@ -164,15 +168,23 @@ public class RegionReplicaFlushHandler extends EventHandler {
// closing or already flushing. Retry flush again after some sleep. // closing or already flushing. Retry flush again after some sleep.
if (!counter.shouldRetry()) { if (!counter.shouldRetry()) {
throw new IOException("Cannot cause primary to flush or drop a wal marker after " + throw new IOException("Cannot cause primary to flush or drop a wal marker after " +
"retries. Failing opening of this region replica " counter.getAttemptTimes() + " retries. Failing opening of this region replica " +
+ region.getRegionInfo().getEncodedName()); region.getRegionInfo().getRegionNameAsString());
} else {
LOG.warn(
"Cannot cause primary replica {} to flush or drop a wal marker " +
"for region replica {}, retry={}",
ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
.getRegionNameAsString(),
region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes());
} }
} }
} else { } else {
// nothing to do. Are we dealing with an old server? // nothing to do. Are we dealing with an old server?
LOG.warn("Was not able to trigger a flush from primary region due to old server version? " LOG.warn(
+ "Continuing to open the secondary region replica: " "Was not able to trigger a flush from primary region due to old server version? " +
+ region.getRegionInfo().getEncodedName()); "Continuing to open the secondary region replica: " +
region.getRegionInfo().getRegionNameAsString());
region.setReadsEnabled(true); region.setReadsEnabled(true);
break; break;
} }