HBASE-19635 Introduce a thread at RS side to call reportProcedureDone

This commit is contained in:
zhangduo 2017-12-27 20:13:42 +08:00
parent f4703c6ed3
commit 62a4f5bb46
5 changed files with 150 additions and 57 deletions

View File

@ -146,7 +146,7 @@ message RegionSpaceUseReportRequest {
message RegionSpaceUseReportResponse { message RegionSpaceUseReportResponse {
} }
message ReportProcedureDoneRequest { message RemoteProcedureResult {
required uint64 proc_id = 1; required uint64 proc_id = 1;
enum Status { enum Status {
SUCCESS = 1; SUCCESS = 1;
@ -155,6 +155,9 @@ message ReportProcedureDoneRequest {
required Status status = 2; required Status status = 2;
optional ForeignExceptionMessage error = 3; optional ForeignExceptionMessage error = 3;
} }
message ReportProcedureDoneRequest {
repeated RemoteProcedureResult result = 1;
}
message ReportProcedureDoneResponse { message ReportProcedureDoneResponse {
} }

View File

@ -265,6 +265,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
@ -2254,12 +2255,14 @@ public class MasterRpcServices extends RSRpcServices
@Override @Override
public ReportProcedureDoneResponse reportProcedureDone(RpcController controller, public ReportProcedureDoneResponse reportProcedureDone(RpcController controller,
ReportProcedureDoneRequest request) throws ServiceException { ReportProcedureDoneRequest request) throws ServiceException {
if (request.getStatus() == ReportProcedureDoneRequest.Status.SUCCESS) { request.getResultList().forEach(result -> {
master.remoteProcedureCompleted(request.getProcId()); if (result.getStatus() == RemoteProcedureResult.Status.SUCCESS) {
} else { master.remoteProcedureCompleted(result.getProcId());
master.remoteProcedureFailed(request.getProcId(), } else {
RemoteProcedureException.fromProto(request.getError())); master.remoteProcedureFailed(result.getProcId(),
} RemoteProcedureException.fromProto(result.getError()));
}
});
return ReportProcedureDoneResponse.getDefaultInstance(); return ReportProcedureDoneResponse.getDefaultInstance();
} }
} }

View File

@ -146,7 +146,6 @@ import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.JvmPauseMonitor; import org.apache.hadoop.hbase.util.JvmPauseMonitor;
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
@ -380,6 +379,9 @@ public class HRegionServer extends HasThread implements
// eclipse warning when accessed by inner classes // eclipse warning when accessed by inner classes
protected LogRoller walRoller; protected LogRoller walRoller;
// A thread which calls reportProcedureDone
private RemoteProcedureResultReporter procedureResultReporter;
// flag set after we're done setting up server threads // flag set after we're done setting up server threads
final AtomicBoolean online = new AtomicBoolean(false); final AtomicBoolean online = new AtomicBoolean(false);
@ -1899,6 +1901,7 @@ public class HRegionServer extends HasThread implements
this.walRoller = new LogRoller(this, this); this.walRoller = new LogRoller(this, this);
this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
this.procedureResultReporter = new RemoteProcedureResultReporter(this);
// Create the CompactedFileDischarger chore executorService. This chore helps to // Create the CompactedFileDischarger chore executorService. This chore helps to
// remove the compacted files // remove the compacted files
@ -1942,6 +1945,8 @@ public class HRegionServer extends HasThread implements
Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
uncaughtExceptionHandler); uncaughtExceptionHandler);
this.cacheFlusher.start(uncaughtExceptionHandler); this.cacheFlusher.start(uncaughtExceptionHandler);
Threads.setDaemonThreadRunning(this.procedureResultReporter,
getName() + ".procedureResultReporter", uncaughtExceptionHandler);
if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker); if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher); if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher);
@ -3736,55 +3741,26 @@ public class HRegionServer extends HasThread implements
executorService.submit(new RSProcedureHandler(this, procId, callable)); executorService.submit(new RSProcedureHandler(this, procId, callable));
} }
public void reportProcedureDone(long procId, Throwable error) { public void remoteProcedureComplete(long procId, Throwable error) {
ReportProcedureDoneRequest.Builder builder = procedureResultReporter.complete(procId, error);
ReportProcedureDoneRequest.newBuilder().setProcId(procId); }
if (error != null) {
builder.setStatus(ReportProcedureDoneRequest.Status.ERROR) void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
.setError(ForeignExceptionUtil.toProtoForeignException(serverName.toString(), error)); RegionServerStatusService.BlockingInterface rss = rssStub;
} else { for (;;) {
builder.setStatus(ReportProcedureDoneRequest.Status.SUCCESS); rss = rssStub;
} if (rss != null) {
ReportProcedureDoneRequest request = builder.build(); break;
int tries = 0;
long pauseTime = INIT_PAUSE_TIME_MS;
while (keepLooping()) {
RegionServerStatusService.BlockingInterface rss = rssStub;
try {
if (rss == null) {
createRegionServerStatusStub();
continue;
}
rss.reportProcedureDone(null, request);
// Log if we had to retry else don't log unless TRACE. We want to
// know if were successful after an attempt showed in logs as failed.
if (tries > 0 || LOG.isTraceEnabled()) {
LOG.info("PROCEDURE REPORTED " + request);
}
return;
} catch (ServiceException se) {
IOException ioe = ProtobufUtil.getRemoteException(se);
boolean pause =
ioe instanceof ServerNotRunningYetException || ioe instanceof PleaseHoldException;
if (pause) {
// Do backoff else we flood the Master with requests.
pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries);
} else {
pauseTime = INIT_PAUSE_TIME_MS; // Reset.
}
LOG.info(
"Failed to report transition " + TextFormat.shortDebugString(request) + "; retry (#" +
tries + ")" + (pause ? " after " + pauseTime + "ms delay (Master is coming online...)."
: " immediately."),
ioe);
if (pause) {
Threads.sleep(pauseTime);
}
tries++;
if (rssStub == rss) {
rssStub = null;
}
} }
createRegionServerStatusStub();
}
try {
rss.reportProcedureDone(null, request);
} catch (ServiceException se) {
if (rssStub == rss) {
rssStub = null;
}
throw ProtobufUtil.getRemoteException(se);
} }
} }
} }

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
/**
* A thread which calls {@code reportProcedureDone} to tell master the result of a remote procedure.
*/
@InterfaceAudience.Private
class RemoteProcedureResultReporter extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureResultReporter.class);
// Time to pause if master says 'please hold'. Make configurable if needed.
private static final int INIT_PAUSE_TIME_MS = 1000;
private static final int MAX_BATCH = 100;
private final HRegionServer server;
private final LinkedBlockingQueue<RemoteProcedureResult> results = new LinkedBlockingQueue<>();
public RemoteProcedureResultReporter(HRegionServer server) {
this.server = server;
}
public void complete(long procId, Throwable error) {
RemoteProcedureResult.Builder builder = RemoteProcedureResult.newBuilder().setProcId(procId);
if (error != null) {
builder.setStatus(RemoteProcedureResult.Status.ERROR).setError(
ForeignExceptionUtil.toProtoForeignException(server.getServerName().toString(), error));
} else {
builder.setStatus(RemoteProcedureResult.Status.SUCCESS);
}
results.add(builder.build());
}
@Override
public void run() {
ReportProcedureDoneRequest.Builder builder = ReportProcedureDoneRequest.newBuilder();
int tries = 0;
while (!server.isStopped()) {
if (builder.getResultCount() == 0) {
try {
builder.addResult(results.take());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
continue;
}
}
while (builder.getResultCount() < MAX_BATCH) {
RemoteProcedureResult result = results.poll();
if (result == null) {
break;
}
builder.addResult(result);
}
ReportProcedureDoneRequest request = builder.build();
try {
server.reportProcedureDone(builder.build());
builder.clear();
tries = 0;
} catch (IOException e) {
boolean pause =
e instanceof ServerNotRunningYetException || e instanceof PleaseHoldException;
long pauseTime;
if (pause) {
// Do backoff else we flood the Master with requests.
pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries);
} else {
pauseTime = INIT_PAUSE_TIME_MS; // Reset.
}
LOG.info("Failed report procedure " + TextFormat.shortDebugString(request) + "; retry (#" +
tries + ")" + (pause ? " after " + pauseTime + "ms delay (Master is coming online...)."
: " immediately."),
e);
Threads.sleep(pauseTime);
tries++;
}
}
}
}

View File

@ -49,6 +49,6 @@ public class RSProcedureHandler extends EventHandler {
LOG.error("Catch exception when call RSProcedureCallable: ", e); LOG.error("Catch exception when call RSProcedureCallable: ", e);
error = e; error = e;
} }
((HRegionServer) server).reportProcedureDone(procId, error); ((HRegionServer) server).remoteProcedureComplete(procId, error);
} }
} }