Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
644f820c6c
commit
eea2ea8576
|
@ -477,8 +477,8 @@ public class HRegionServer extends Thread implements
|
||||||
protected String useThisHostnameInstead;
|
protected String useThisHostnameInstead;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @deprecated since 2.4.0 and will be removed in 4.0.0.
|
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||||
* Use {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
|
* {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
|
||||||
* @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
|
* @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
|
@ -727,9 +727,10 @@ public class HRegionServer extends Thread implements
|
||||||
String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY);
|
String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY);
|
||||||
if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
|
if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
|
||||||
if (!StringUtils.isBlank(hostname)) {
|
if (!StringUtils.isBlank(hostname)) {
|
||||||
String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " + UNSAFE_RS_HOSTNAME_KEY +
|
String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " +
|
||||||
" are mutually exclusive. Do not set " + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY +
|
UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set " +
|
||||||
" to true while " + UNSAFE_RS_HOSTNAME_KEY + " is used";
|
UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while " +
|
||||||
|
UNSAFE_RS_HOSTNAME_KEY + " is used";
|
||||||
throw new IOException(msg);
|
throw new IOException(msg);
|
||||||
} else {
|
} else {
|
||||||
return rpcServices.isa.getHostName();
|
return rpcServices.isa.getHostName();
|
||||||
|
@ -862,7 +863,8 @@ public class HRegionServer extends Thread implements
|
||||||
|
|
||||||
coprocessorServiceHandlers.put(serviceName, instance);
|
coprocessorServiceHandlers.put(serviceName, instance);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Registered regionserver coprocessor executorService: executorService=" + serviceName);
|
LOG.debug("Registered regionserver coprocessor executorService: executorService=" +
|
||||||
|
serviceName);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -877,13 +879,15 @@ public class HRegionServer extends Thread implements
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Run test on configured codecs to make sure supporting libs are in place.
|
* Run test on configured codecs to make sure supporting libs are in place.
|
||||||
* @param c
|
* @param c configuration object
|
||||||
* @throws IOException
|
* @throws IOException if compression test fails for any regionserver codec
|
||||||
*/
|
*/
|
||||||
private static void checkCodecs(final Configuration c) throws IOException {
|
private static void checkCodecs(final Configuration c) throws IOException {
|
||||||
// check to see if the codec list is available:
|
// check to see if the codec list is available:
|
||||||
String [] codecs = c.getStrings(REGIONSERVER_CODEC, (String[])null);
|
String [] codecs = c.getStrings(REGIONSERVER_CODEC, (String[])null);
|
||||||
if (codecs == null) return;
|
if (codecs == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
for (String codec : codecs) {
|
for (String codec : codecs) {
|
||||||
if (!CompressionTest.testCompression(codec)) {
|
if (!CompressionTest.testCompression(codec)) {
|
||||||
throw new IOException("Compression codec " + codec +
|
throw new IOException("Compression codec " + codec +
|
||||||
|
@ -1138,9 +1142,15 @@ public class HRegionServer extends Thread implements
|
||||||
|
|
||||||
// Send interrupts to wake up threads if sleeping so they notice shutdown.
|
// Send interrupts to wake up threads if sleeping so they notice shutdown.
|
||||||
// TODO: Should we check they are alive? If OOME could have exited already
|
// TODO: Should we check they are alive? If OOME could have exited already
|
||||||
if (this.hMemManager != null) this.hMemManager.stop();
|
if (this.hMemManager != null) {
|
||||||
if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
|
this.hMemManager.stop();
|
||||||
if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
|
}
|
||||||
|
if (this.cacheFlusher != null) {
|
||||||
|
this.cacheFlusher.interruptIfNecessary();
|
||||||
|
}
|
||||||
|
if (this.compactSplitThread != null) {
|
||||||
|
this.compactSplitThread.interruptIfNecessary();
|
||||||
|
}
|
||||||
|
|
||||||
// Stop the snapshot and other procedure handlers, forcefully killing all running tasks
|
// Stop the snapshot and other procedure handlers, forcefully killing all running tasks
|
||||||
if (rspmHost != null) {
|
if (rspmHost != null) {
|
||||||
|
@ -1247,7 +1257,9 @@ public class HRegionServer extends Thread implements
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean areAllUserRegionsOffline() {
|
private boolean areAllUserRegionsOffline() {
|
||||||
if (getNumberOfOnlineRegions() > 2) return false;
|
if (getNumberOfOnlineRegions() > 2) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
boolean allUserRegionsOffline = true;
|
boolean allUserRegionsOffline = true;
|
||||||
for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
|
for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
|
||||||
if (!e.getValue().getRegionInfo().isMetaRegion()) {
|
if (!e.getValue().getRegionInfo().isMetaRegion()) {
|
||||||
|
@ -1469,7 +1481,9 @@ public class HRegionServer extends Thread implements
|
||||||
private String getOnlineRegionsAsPrintableString() {
|
private String getOnlineRegionsAsPrintableString() {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
for (Region r: this.onlineRegions.values()) {
|
for (Region r: this.onlineRegions.values()) {
|
||||||
if (sb.length() > 0) sb.append(", ");
|
if (sb.length() > 0) {
|
||||||
|
sb.append(", ");
|
||||||
|
}
|
||||||
sb.append(r.getRegionInfo().getEncodedName());
|
sb.append(r.getRegionInfo().getEncodedName());
|
||||||
}
|
}
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
|
@ -1577,7 +1591,7 @@ public class HRegionServer extends Thread implements
|
||||||
* @param c Extra configuration.
|
* @param c Extra configuration.
|
||||||
*/
|
*/
|
||||||
protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
|
protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
boolean updateRootDir = false;
|
boolean updateRootDir = false;
|
||||||
for (NameStringPair e : c.getMapEntriesList()) {
|
for (NameStringPair e : c.getMapEntriesList()) {
|
||||||
|
@ -1873,7 +1887,7 @@ public class HRegionServer extends Thread implements
|
||||||
this.instance.compactSplitThread.requestCompaction(hr, s,
|
this.instance.compactSplitThread.requestCompaction(hr, s,
|
||||||
getName() + " requests major compaction; use default priority",
|
getName() + " requests major compaction; use default priority",
|
||||||
Store.NO_PRIORITY,
|
Store.NO_PRIORITY,
|
||||||
CompactionLifeCycleTracker.DUMMY, null);
|
CompactionLifeCycleTracker.DUMMY, null);
|
||||||
} else {
|
} else {
|
||||||
this.instance.compactSplitThread.requestCompaction(hr, s,
|
this.instance.compactSplitThread.requestCompaction(hr, s,
|
||||||
getName() + " requests major compaction; use configured priority",
|
getName() + " requests major compaction; use configured priority",
|
||||||
|
@ -1908,7 +1922,9 @@ public class HRegionServer extends Thread implements
|
||||||
protected void chore() {
|
protected void chore() {
|
||||||
final StringBuilder whyFlush = new StringBuilder();
|
final StringBuilder whyFlush = new StringBuilder();
|
||||||
for (HRegion r : this.server.onlineRegions.values()) {
|
for (HRegion r : this.server.onlineRegions.values()) {
|
||||||
if (r == null) continue;
|
if (r == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (r.shouldFlush(whyFlush)) {
|
if (r.shouldFlush(whyFlush)) {
|
||||||
FlushRequester requester = server.getFlushRequester();
|
FlushRequester requester = server.getFlushRequester();
|
||||||
if (requester != null) {
|
if (requester != null) {
|
||||||
|
@ -2012,7 +2028,7 @@ public class HRegionServer extends Thread implements
|
||||||
// Health checker thread.
|
// Health checker thread.
|
||||||
if (isHealthCheckerConfigured()) {
|
if (isHealthCheckerConfigured()) {
|
||||||
int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
|
int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
|
||||||
HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
|
HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
|
||||||
healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
|
healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2502,7 +2518,9 @@ public class HRegionServer extends Thread implements
|
||||||
" after " + pauseTime + "ms delay (Master is coming online...).":
|
" after " + pauseTime + "ms delay (Master is coming online...).":
|
||||||
" immediately."),
|
" immediately."),
|
||||||
ioe);
|
ioe);
|
||||||
if (pause) Threads.sleep(pauseTime);
|
if (pause) {
|
||||||
|
Threads.sleep(pauseTime);
|
||||||
|
}
|
||||||
tries++;
|
tries++;
|
||||||
if (rssStub == rss) {
|
if (rssStub == rss) {
|
||||||
rssStub = null;
|
rssStub = null;
|
||||||
|
@ -2612,7 +2630,7 @@ public class HRegionServer extends Thread implements
|
||||||
/**
|
/**
|
||||||
* Sets the abort state if not already set.
|
* Sets the abort state if not already set.
|
||||||
* @return True if abortRequested set to True successfully, false if an abort is already in
|
* @return True if abortRequested set to True successfully, false if an abort is already in
|
||||||
* progress.
|
* progress.
|
||||||
*/
|
*/
|
||||||
protected boolean setAbortRequested() {
|
protected boolean setAbortRequested() {
|
||||||
return abortRequested.compareAndSet(false, true);
|
return abortRequested.compareAndSet(false, true);
|
||||||
|
@ -2689,7 +2707,9 @@ public class HRegionServer extends Thread implements
|
||||||
if (this.compactSplitThread != null) {
|
if (this.compactSplitThread != null) {
|
||||||
this.compactSplitThread.join();
|
this.compactSplitThread.join();
|
||||||
}
|
}
|
||||||
if (this.executorService != null) this.executorService.shutdown();
|
if (this.executorService != null) {
|
||||||
|
this.executorService.shutdown();
|
||||||
|
}
|
||||||
if (this.replicationSourceHandler != null &&
|
if (this.replicationSourceHandler != null &&
|
||||||
this.replicationSourceHandler == this.replicationSinkHandler) {
|
this.replicationSourceHandler == this.replicationSinkHandler) {
|
||||||
this.replicationSourceHandler.stopReplicationService();
|
this.replicationSourceHandler.stopReplicationService();
|
||||||
|
@ -2704,8 +2724,7 @@ public class HRegionServer extends Thread implements
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Return the object that implements the replication
|
* @return Return the object that implements the replication source executorService.
|
||||||
* source executorService.
|
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public ReplicationSourceService getReplicationSourceService() {
|
public ReplicationSourceService getReplicationSourceService() {
|
||||||
|
@ -2714,7 +2733,7 @@ public class HRegionServer extends Thread implements
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Return the object that implements the replication
|
* @return Return the object that implements the replication
|
||||||
* sink executorService.
|
* sink executorService.
|
||||||
*/
|
*/
|
||||||
ReplicationSinkService getReplicationSinkService() {
|
ReplicationSinkService getReplicationSinkService() {
|
||||||
return replicationSinkHandler;
|
return replicationSinkHandler;
|
||||||
|
@ -2810,8 +2829,8 @@ public class HRegionServer extends Thread implements
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return True if we should break loop because cluster is going down or
|
* @return True if we should break loop because cluster is going down or this server has been
|
||||||
* this server has been stopped or hdfs has gone bad.
|
* stopped or hdfs has gone bad.
|
||||||
*/
|
*/
|
||||||
private boolean keepLooping() {
|
private boolean keepLooping() {
|
||||||
return !this.stopped && isClusterUp();
|
return !this.stopped && isClusterUp();
|
||||||
|
@ -2825,10 +2844,14 @@ public class HRegionServer extends Thread implements
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private RegionServerStartupResponse reportForDuty() throws IOException {
|
private RegionServerStartupResponse reportForDuty() throws IOException {
|
||||||
if (this.masterless) return RegionServerStartupResponse.getDefaultInstance();
|
if (this.masterless) {
|
||||||
|
return RegionServerStartupResponse.getDefaultInstance();
|
||||||
|
}
|
||||||
ServerName masterServerName = createRegionServerStatusStub(true);
|
ServerName masterServerName = createRegionServerStatusStub(true);
|
||||||
RegionServerStatusService.BlockingInterface rss = rssStub;
|
RegionServerStatusService.BlockingInterface rss = rssStub;
|
||||||
if (masterServerName == null || rss == null) return null;
|
if (masterServerName == null || rss == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
RegionServerStartupResponse result = null;
|
RegionServerStartupResponse result = null;
|
||||||
try {
|
try {
|
||||||
rpcServices.requestCount.reset();
|
rpcServices.requestCount.reset();
|
||||||
|
@ -2906,12 +2929,16 @@ public class HRegionServer extends Thread implements
|
||||||
if (hri.isMetaRegion()) {
|
if (hri.isMetaRegion()) {
|
||||||
meta = e.getValue();
|
meta = e.getValue();
|
||||||
}
|
}
|
||||||
if (meta != null) break;
|
if (meta != null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
this.onlineRegionsLock.writeLock().unlock();
|
this.onlineRegionsLock.writeLock().unlock();
|
||||||
}
|
}
|
||||||
if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
|
if (meta != null) {
|
||||||
|
closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -3218,17 +3245,17 @@ public class HRegionServer extends Thread implements
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public List<HRegion> getRegions(TableName tableName) {
|
public List<HRegion> getRegions(TableName tableName) {
|
||||||
List<HRegion> tableRegions = new ArrayList<>();
|
List<HRegion> tableRegions = new ArrayList<>();
|
||||||
synchronized (this.onlineRegions) {
|
synchronized (this.onlineRegions) {
|
||||||
for (HRegion region: this.onlineRegions.values()) {
|
for (HRegion region: this.onlineRegions.values()) {
|
||||||
RegionInfo regionInfo = region.getRegionInfo();
|
RegionInfo regionInfo = region.getRegionInfo();
|
||||||
if(regionInfo.getTable().equals(tableName)) {
|
if(regionInfo.getTable().equals(tableName)) {
|
||||||
tableRegions.add(region);
|
tableRegions.add(region);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return tableRegions;
|
return tableRegions;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<HRegion> getRegions() {
|
public List<HRegion> getRegions() {
|
||||||
|
@ -3402,13 +3429,16 @@ public class HRegionServer extends Thread implements
|
||||||
if (closeSeqNum == HConstants.NO_SEQNUM) {
|
if (closeSeqNum == HConstants.NO_SEQNUM) {
|
||||||
// No edits in WAL for this region; get the sequence number when the region was opened.
|
// No edits in WAL for this region; get the sequence number when the region was opened.
|
||||||
closeSeqNum = r.getOpenSeqNum();
|
closeSeqNum = r.getOpenSeqNum();
|
||||||
if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0;
|
if (closeSeqNum == HConstants.NO_SEQNUM) {
|
||||||
|
closeSeqNum = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
|
boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
|
||||||
addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
|
addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
|
||||||
if (selfMove) {
|
if (selfMove) {
|
||||||
this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(r.getRegionInfo().getEncodedName()
|
this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(
|
||||||
, new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
|
r.getRegionInfo().getEncodedName(),
|
||||||
|
new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
|
this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
|
||||||
|
@ -3523,7 +3553,7 @@ public class HRegionServer extends Thread implements
|
||||||
* Return the favored nodes for a region given its encoded name. Look at the
|
* Return the favored nodes for a region given its encoded name. Look at the
|
||||||
* comment around {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[]
|
* comment around {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[]
|
||||||
* here.
|
* here.
|
||||||
* @param encodedRegionName
|
* @param encodedRegionName the encoded region name.
|
||||||
* @return array of favored locations
|
* @return array of favored locations
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@ -3543,7 +3573,7 @@ public class HRegionServer extends Thread implements
|
||||||
MovedRegionInfo(ServerName serverName, long closeSeqNum) {
|
MovedRegionInfo(ServerName serverName, long closeSeqNum) {
|
||||||
this.serverName = serverName;
|
this.serverName = serverName;
|
||||||
this.seqNum = closeSeqNum;
|
this.seqNum = closeSeqNum;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ServerName getServerName() {
|
public ServerName getServerName() {
|
||||||
return serverName;
|
return serverName;
|
||||||
|
@ -3560,7 +3590,8 @@ public class HRegionServer extends Thread implements
|
||||||
*/
|
*/
|
||||||
private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
|
private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
|
||||||
|
|
||||||
private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum, boolean selfMove) {
|
private void addToMovedRegions(String encodedName, ServerName destination,
|
||||||
|
long closeSeqNum, boolean selfMove) {
|
||||||
if (selfMove) {
|
if (selfMove) {
|
||||||
LOG.warn("Not adding moved region record: " + encodedName + " to self.");
|
LOG.warn("Not adding moved region record: " + encodedName + " to self.");
|
||||||
return;
|
return;
|
||||||
|
@ -3581,7 +3612,7 @@ public class HRegionServer extends Thread implements
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public int movedRegionCacheExpiredTime() {
|
public int movedRegionCacheExpiredTime() {
|
||||||
return TIMEOUT_REGION_MOVED;
|
return TIMEOUT_REGION_MOVED;
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getMyEphemeralNodePath() {
|
private String getMyEphemeralNodePath() {
|
||||||
|
@ -3609,8 +3640,8 @@ public class HRegionServer extends Thread implements
|
||||||
String serviceName = call.getServiceName();
|
String serviceName = call.getServiceName();
|
||||||
com.google.protobuf.Service service = coprocessorServiceHandlers.get(serviceName);
|
com.google.protobuf.Service service = coprocessorServiceHandlers.get(serviceName);
|
||||||
if (service == null) {
|
if (service == null) {
|
||||||
throw new UnknownProtocolException(null, "No registered coprocessor executorService found for " +
|
throw new UnknownProtocolException(null,
|
||||||
serviceName);
|
"No registered coprocessor executorService found for " + serviceName);
|
||||||
}
|
}
|
||||||
com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc =
|
com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc =
|
||||||
service.getDescriptorForType();
|
service.getDescriptorForType();
|
||||||
|
|
Loading…
Reference in New Issue