HBASE-25067 Edit of log messages around async WAL Replication; checkstyle fixes; and a bugfix (#2435)
Editing logging around region replicas: shortening and adding context. Checkstyle fixes in edited files while I was in there. Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
d1a3b66074
commit
70a947dc6b
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -60,7 +60,7 @@ public class ReplicationPeerImpl implements ReplicationPeer {
|
||||||
SyncReplicationState newSyncReplicationState) {
|
SyncReplicationState newSyncReplicationState) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.peerState = peerState ? PeerState.ENABLED : PeerState.DISABLED;
|
setPeerState(peerState);
|
||||||
this.peerConfig = peerConfig;
|
this.peerConfig = peerConfig;
|
||||||
this.syncReplicationStateBits =
|
this.syncReplicationStateBits =
|
||||||
syncReplicationState.value() | (newSyncReplicationState.value() << SHIFT);
|
syncReplicationState.value() | (newSyncReplicationState.value() << SHIFT);
|
||||||
|
|
|
@ -1787,7 +1787,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
toPrint = regionsInTransition.subList(0, max);
|
toPrint = regionsInTransition.subList(0, max);
|
||||||
truncated = true;
|
truncated = true;
|
||||||
}
|
}
|
||||||
LOG.info(prefix + "unning balancer because " + regionsInTransition.size() +
|
LOG.info(prefix + " not running balancer because " + regionsInTransition.size() +
|
||||||
" region(s) in transition: " + toPrint + (truncated? "(truncated list)": ""));
|
" region(s) in transition: " + toPrint + (truncated? "(truncated list)": ""));
|
||||||
if (!force || metaInTransition) return false;
|
if (!force || metaInTransition) return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,9 +142,9 @@ public class EnableTableProcedure
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// the replicasFound is less than the regionReplication
|
// the replicasFound is less than the regionReplication
|
||||||
LOG.info("Number of replicas has increased. Assigning new region replicas." +
|
LOG.info("Number of replicas has increased for {}. Assigning new region replicas." +
|
||||||
"The previous replica count was {}. The current replica count is {}.",
|
"The previous replica count was {}. The current replica count is {}.",
|
||||||
(currentMaxReplica + 1), configuredReplicaCount);
|
this.tableName, (currentMaxReplica + 1), configuredReplicaCount);
|
||||||
regionsOfTable = RegionReplicaUtil.addReplicas(regionsOfTable,
|
regionsOfTable = RegionReplicaUtil.addReplicas(regionsOfTable,
|
||||||
currentMaxReplica + 1, configuredReplicaCount);
|
currentMaxReplica + 1, configuredReplicaCount);
|
||||||
}
|
}
|
||||||
|
|
|
@ -2438,11 +2438,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
status.setStatus("Acquiring readlock on region");
|
status.setStatus("Acquiring readlock on region");
|
||||||
// block waiting for the lock for flushing cache
|
// block waiting for the lock for flushing cache
|
||||||
lock.readLock().lock();
|
lock.readLock().lock();
|
||||||
|
boolean flushed = true;
|
||||||
try {
|
try {
|
||||||
if (this.closed.get()) {
|
if (this.closed.get()) {
|
||||||
String msg = "Skipping flush on " + this + " because closed";
|
String msg = "Skipping flush on " + this + " because closed";
|
||||||
LOG.debug(msg);
|
LOG.debug(msg);
|
||||||
status.abort(msg);
|
status.abort(msg);
|
||||||
|
flushed = false;
|
||||||
return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
|
return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
|
||||||
}
|
}
|
||||||
if (coprocessorHost != null) {
|
if (coprocessorHost != null) {
|
||||||
|
@ -2459,15 +2461,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
if (!writestate.flushing && writestate.writesEnabled) {
|
if (!writestate.flushing && writestate.writesEnabled) {
|
||||||
this.writestate.flushing = true;
|
this.writestate.flushing = true;
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
String msg = "NOT flushing " + this + " as " + (writestate.flushing ? "already flushing"
|
||||||
LOG.debug("NOT flushing memstore for region " + this
|
: "writes are not enabled");
|
||||||
+ ", flushing=" + writestate.flushing + ", writesEnabled="
|
LOG.debug(msg);
|
||||||
+ writestate.writesEnabled);
|
|
||||||
}
|
|
||||||
String msg = "Not flushing since "
|
|
||||||
+ (writestate.flushing ? "already flushing"
|
|
||||||
: "writes not enabled");
|
|
||||||
status.abort(msg);
|
status.abort(msg);
|
||||||
|
flushed = false;
|
||||||
return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
|
return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2505,8 +2503,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
lock.readLock().unlock();
|
lock.readLock().unlock();
|
||||||
LOG.debug("Flush status journal for {}:\n{}", this.getRegionInfo().getEncodedName(),
|
if (flushed) {
|
||||||
status.prettyPrintJournal());
|
// Don't log this journal stuff if no flush -- confusing.
|
||||||
|
LOG.debug("Flush status journal for {}:\n{}", this.getRegionInfo().getEncodedName(),
|
||||||
|
status.prettyPrintJournal());
|
||||||
|
}
|
||||||
status.cleanup();
|
status.cleanup();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5032,7 +5033,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
|
|
||||||
public void setReadsEnabled(boolean readsEnabled) {
|
public void setReadsEnabled(boolean readsEnabled) {
|
||||||
if (readsEnabled && !this.writestate.readsEnabled) {
|
if (readsEnabled && !this.writestate.readsEnabled) {
|
||||||
LOG.info(getRegionInfo().getEncodedName() + " : Enabling reads for region.");
|
LOG.info("Enabling reads for {}", getRegionInfo().getEncodedName());
|
||||||
}
|
}
|
||||||
this.writestate.setReadsEnabled(readsEnabled);
|
this.writestate.setReadsEnabled(readsEnabled);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -123,6 +123,8 @@ import org.apache.hadoop.hbase.master.MasterRpcServicesVersionWrapper;
|
||||||
import org.apache.hadoop.hbase.master.RegionState;
|
import org.apache.hadoop.hbase.master.RegionState;
|
||||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
|
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
|
||||||
import org.apache.hadoop.hbase.mob.MobFileCache;
|
import org.apache.hadoop.hbase.mob.MobFileCache;
|
||||||
|
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
|
||||||
|
import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
|
||||||
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
|
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
|
||||||
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
|
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
|
||||||
import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
|
import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
|
||||||
|
@ -139,8 +141,6 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
|
||||||
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
|
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
|
||||||
import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
|
import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
|
||||||
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
|
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
|
||||||
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
|
|
||||||
import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
|
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
|
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
|
||||||
|
@ -2466,10 +2466,13 @@ public class HRegionServer extends Thread implements
|
||||||
region.setReadsEnabled(false); // disable reads before marking the region as opened.
|
region.setReadsEnabled(false); // disable reads before marking the region as opened.
|
||||||
// RegionReplicaFlushHandler might reset this.
|
// RegionReplicaFlushHandler might reset this.
|
||||||
|
|
||||||
// submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
|
// Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
|
||||||
if (this.executorService != null) {
|
if (this.executorService != null) {
|
||||||
this.executorService.submit(new RegionReplicaFlushHandler(this, region));
|
this.executorService.submit(new RegionReplicaFlushHandler(this, region));
|
||||||
}
|
} else {
|
||||||
|
LOG.info("Executor is null; not running flush of primary region replica for {}",
|
||||||
|
region.getRegionInfo());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -92,7 +92,7 @@ public class AssignRegionHandler extends EventHandler {
|
||||||
String regionName = regionInfo.getRegionNameAsString();
|
String regionName = regionInfo.getRegionNameAsString();
|
||||||
Region onlineRegion = rs.getRegion(encodedName);
|
Region onlineRegion = rs.getRegion(encodedName);
|
||||||
if (onlineRegion != null) {
|
if (onlineRegion != null) {
|
||||||
LOG.warn("Received OPEN for the region:{}, which is already online", regionName);
|
LOG.warn("Received OPEN for {} which is already online", regionName);
|
||||||
// Just follow the old behavior, do we need to call reportRegionStateTransition? Maybe not?
|
// Just follow the old behavior, do we need to call reportRegionStateTransition? Maybe not?
|
||||||
// For normal case, it could happen that the rpc call to schedule this handler is succeeded,
|
// For normal case, it could happen that the rpc call to schedule this handler is succeeded,
|
||||||
// but before returning to master the connection is broken. And when master tries again, we
|
// but before returning to master the connection is broken. And when master tries again, we
|
||||||
|
@ -104,7 +104,7 @@ public class AssignRegionHandler extends EventHandler {
|
||||||
if (previous != null) {
|
if (previous != null) {
|
||||||
if (previous) {
|
if (previous) {
|
||||||
// The region is opening and this maybe a retry on the rpc call, it is safe to ignore it.
|
// The region is opening and this maybe a retry on the rpc call, it is safe to ignore it.
|
||||||
LOG.info("Receiving OPEN for the region:{}, which we are already trying to OPEN" +
|
LOG.info("Receiving OPEN for {} which we are already trying to OPEN" +
|
||||||
" - ignoring this new request for this region.", regionName);
|
" - ignoring this new request for this region.", regionName);
|
||||||
} else {
|
} else {
|
||||||
// The region is closing. This is possible as we will update the region state to CLOSED when
|
// The region is closing. This is possible as we will update the region state to CLOSED when
|
||||||
|
@ -113,7 +113,7 @@ public class AssignRegionHandler extends EventHandler {
|
||||||
// closing process.
|
// closing process.
|
||||||
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
|
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Receiving OPEN for the region:{}, which we are trying to close, try again after {}ms",
|
"Receiving OPEN for {} which we are trying to close, try again after {}ms",
|
||||||
regionName, backoff);
|
regionName, backoff);
|
||||||
rs.getExecutorService().delayedSubmit(this, backoff, TimeUnit.MILLISECONDS);
|
rs.getExecutorService().delayedSubmit(this, backoff, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
@ -145,11 +145,10 @@ public class AssignRegionHandler extends EventHandler {
|
||||||
Boolean current = rs.getRegionsInTransitionInRS().remove(regionInfo.getEncodedNameAsBytes());
|
Boolean current = rs.getRegionsInTransitionInRS().remove(regionInfo.getEncodedNameAsBytes());
|
||||||
if (current == null) {
|
if (current == null) {
|
||||||
// Should NEVER happen, but let's be paranoid.
|
// Should NEVER happen, but let's be paranoid.
|
||||||
LOG.error("Bad state: we've just opened a region that was NOT in transition. Region={}",
|
LOG.error("Bad state: we've just opened {} which was NOT in transition", regionName);
|
||||||
regionName);
|
|
||||||
} else if (!current) {
|
} else if (!current) {
|
||||||
// Should NEVER happen, but let's be paranoid.
|
// Should NEVER happen, but let's be paranoid.
|
||||||
LOG.error("Bad state: we've just opened a region that was closing. Region={}", regionName);
|
LOG.error("Bad state: we've just opened {} which was closing", regionName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -39,9 +39,9 @@ import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to wal in
|
* HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to WAL in
|
||||||
* secondary region replicas. This means that a secondary region replica can serve some edits from
|
* secondary region replicas. This means that a secondary region replica can serve some edits from
|
||||||
* it's memstore that that is still not flushed from primary. We do not want to allow secondary
|
* it's memstore that are still not flushed from primary. We do not want to allow secondary
|
||||||
* region's seqId to go back in time, when this secondary region is opened elsewhere after a
|
* region's seqId to go back in time, when this secondary region is opened elsewhere after a
|
||||||
* crash or region move. We will trigger a flush cache in the primary region replica and wait
|
* crash or region move. We will trigger a flush cache in the primary region replica and wait
|
||||||
* for observing a complete flush cycle before marking the region readsEnabled. This handler does
|
* for observing a complete flush cycle before marking the region readsEnabled. This handler does
|
||||||
|
@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegion
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class RegionReplicaFlushHandler extends EventHandler {
|
public class RegionReplicaFlushHandler extends EventHandler {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class);
|
private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class);
|
||||||
|
|
||||||
private final AsyncClusterConnection connection;
|
private final AsyncClusterConnection connection;
|
||||||
|
@ -73,7 +72,7 @@ public class RegionReplicaFlushHandler extends EventHandler {
|
||||||
if (t instanceof InterruptedIOException || t instanceof InterruptedException) {
|
if (t instanceof InterruptedIOException || t instanceof InterruptedException) {
|
||||||
LOG.error("Caught throwable while processing event " + eventType, t);
|
LOG.error("Caught throwable while processing event " + eventType, t);
|
||||||
} else if (t instanceof RuntimeException) {
|
} else if (t instanceof RuntimeException) {
|
||||||
server.abort("ServerAborting because a runtime exception was thrown", t);
|
server.abort("Server aborting", t);
|
||||||
} else {
|
} else {
|
||||||
// something fishy since we cannot flush the primary region until all retries (retries from
|
// something fishy since we cannot flush the primary region until all retries (retries from
|
||||||
// rpc times 35 trigger). We cannot close the region since there is no such mechanism to
|
// rpc times 35 trigger). We cannot close the region since there is no such mechanism to
|
||||||
|
@ -101,9 +100,9 @@ public class RegionReplicaFlushHandler extends EventHandler {
|
||||||
RetryCounter counter = new RetryCounterFactory(maxAttempts, (int)pause).create();
|
RetryCounter counter = new RetryCounterFactory(maxAttempts, (int)pause).create();
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Attempting to do an RPC to the primary region replica " + ServerRegionReplicaUtil
|
LOG.debug("RPC'ing to primary region replica " +
|
||||||
.getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() + " of region "
|
ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) + " from " +
|
||||||
+ region.getRegionInfo().getEncodedName() + " to trigger a flush");
|
region.getRegionInfo() + " to trigger FLUSH");
|
||||||
}
|
}
|
||||||
while (!region.isClosing() && !region.isClosed()
|
while (!region.isClosing() && !region.isClosed()
|
||||||
&& !server.isAborted() && !server.isStopped()) {
|
&& !server.isAborted() && !server.isStopped()) {
|
||||||
|
@ -142,11 +141,11 @@ public class RegionReplicaFlushHandler extends EventHandler {
|
||||||
// then we have to wait for seeing the flush entry. All reads will be rejected until we see
|
// then we have to wait for seeing the flush entry. All reads will be rejected until we see
|
||||||
// a complete flush cycle or replay a region open event
|
// a complete flush cycle or replay a region open event
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Successfully triggered a flush of primary region replica " +
|
LOG.debug("Triggered flush of primary region replica " +
|
||||||
ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
|
ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
|
||||||
.getRegionNameAsString() +
|
.getRegionNameAsString() +
|
||||||
" of region " + region.getRegionInfo().getRegionNameAsString() +
|
" for " + region.getRegionInfo().getEncodedName() +
|
||||||
" Now waiting and blocking reads until observing a full flush cycle");
|
"; now waiting and blocking reads until completes a full flush cycle");
|
||||||
}
|
}
|
||||||
region.setReadsEnabled(true);
|
region.setReadsEnabled(true);
|
||||||
break;
|
break;
|
||||||
|
@ -154,12 +153,10 @@ public class RegionReplicaFlushHandler extends EventHandler {
|
||||||
if (response.hasWroteFlushWalMarker()) {
|
if (response.hasWroteFlushWalMarker()) {
|
||||||
if (response.getWroteFlushWalMarker()) {
|
if (response.getWroteFlushWalMarker()) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary " +
|
LOG.debug("Triggered empty flush marker (memstore empty) on primary region replica " +
|
||||||
"region replica " +
|
ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()).
|
||||||
ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
|
getRegionNameAsString() + " for " + region.getRegionInfo().getEncodedName() +
|
||||||
.getRegionNameAsString() +
|
"; now waiting and blocking reads until observing a flush marker");
|
||||||
" of region " + region.getRegionInfo().getRegionNameAsString() +
|
|
||||||
" Now waiting and " + "blocking reads until observing a flush marker");
|
|
||||||
}
|
}
|
||||||
region.setReadsEnabled(true);
|
region.setReadsEnabled(true);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -84,19 +84,18 @@ public class UnassignRegionHandler extends EventHandler {
|
||||||
// reportRegionStateTransition, so the HMaster will think the region is online, before we
|
// reportRegionStateTransition, so the HMaster will think the region is online, before we
|
||||||
// actually open the region, as reportRegionStateTransition is part of the opening process.
|
// actually open the region, as reportRegionStateTransition is part of the opening process.
|
||||||
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
|
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
|
||||||
LOG.warn("Received CLOSE for the region: {}, which we are already " +
|
LOG.warn("Received CLOSE for {} which we are already " +
|
||||||
"trying to OPEN. try again after {}ms", encodedName, backoff);
|
"trying to OPEN; try again after {}ms", encodedName, backoff);
|
||||||
rs.getExecutorService().delayedSubmit(this, backoff, TimeUnit.MILLISECONDS);
|
rs.getExecutorService().delayedSubmit(this, backoff, TimeUnit.MILLISECONDS);
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Received CLOSE for the region: {}, which we are already trying to CLOSE," +
|
LOG.info("Received CLOSE for {} which we are already trying to CLOSE," +
|
||||||
" but not completed yet", encodedName);
|
" but not completed yet", encodedName);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
HRegion region = rs.getRegion(encodedName);
|
HRegion region = rs.getRegion(encodedName);
|
||||||
if (region == null) {
|
if (region == null) {
|
||||||
LOG.debug(
|
LOG.debug("Received CLOSE for {} which is not ONLINE and we're not opening/closing.",
|
||||||
"Received CLOSE for a region {} which is not online, and we're not opening/closing.",
|
|
||||||
encodedName);
|
encodedName);
|
||||||
rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
|
rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
|
||||||
return;
|
return;
|
||||||
|
@ -114,10 +113,11 @@ public class UnassignRegionHandler extends EventHandler {
|
||||||
if (region.close(abort) == null) {
|
if (region.close(abort) == null) {
|
||||||
// XXX: Is this still possible? The old comment says about split, but now split is done at
|
// XXX: Is this still possible? The old comment says about split, but now split is done at
|
||||||
// master side, so...
|
// master side, so...
|
||||||
LOG.warn("Can't close region {}, was already closed during close()", regionName);
|
LOG.warn("Can't close {} already closed during close()", regionName);
|
||||||
rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
|
rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
rs.removeRegion(region, destination);
|
rs.removeRegion(region, destination);
|
||||||
if (!rs.reportRegionStateTransition(
|
if (!rs.reportRegionStateTransition(
|
||||||
new RegionStateTransitionContext(TransitionCode.CLOSED, HConstants.NO_SEQNUM, closeProcId,
|
new RegionStateTransitionContext(TransitionCode.CLOSED, HConstants.NO_SEQNUM, closeProcId,
|
||||||
|
|
|
@ -412,14 +412,14 @@ public class ProtobufLogReader extends ReaderBase {
|
||||||
+ "because originalPosition is negative. last offset={}", this.inputStream.getPos(), eof);
|
+ "because originalPosition is negative. last offset={}", this.inputStream.getPos(), eof);
|
||||||
throw eof;
|
throw eof;
|
||||||
}
|
}
|
||||||
// If stuck at the same place and we got and exception, lets go back at the beginning.
|
// If stuck at the same place and we got an exception, lets go back at the beginning.
|
||||||
if (inputStream.getPos() == originalPosition) {
|
if (inputStream.getPos() == originalPosition) {
|
||||||
if (resetPosition) {
|
if (resetPosition) {
|
||||||
LOG.warn("Encountered a malformed edit, seeking to the beginning of the WAL since "
|
LOG.warn("Encountered a malformed edit, seeking to the beginning of the WAL since "
|
||||||
+ "current position and original position match at {}", originalPosition);
|
+ "current position and original position match at {}", originalPosition);
|
||||||
seekOnFs(0);
|
seekOnFs(0);
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Reached the end of file at position {}", originalPosition);
|
LOG.debug("EOF at position {}", originalPosition);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Else restore our position to original location in hope that next time through we will
|
// Else restore our position to original location in hope that next time through we will
|
||||||
|
|
|
@ -38,7 +38,6 @@ import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.mutable.MutableBoolean;
|
import org.apache.commons.lang3.mutable.MutableBoolean;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -249,34 +248,35 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
LOG.trace("NOT replicating {}", wal);
|
LOG.trace("NOT replicating {}", wal);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
|
// Use WAL prefix as the WALGroupId for this peer.
|
||||||
PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
|
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
|
||||||
|
PriorityBlockingQueue<Path> queue = queues.get(walPrefix);
|
||||||
if (queue == null) {
|
if (queue == null) {
|
||||||
queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
|
queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
|
||||||
// make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
|
// make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
|
||||||
// the shipper may quit immediately
|
// the shipper may quit immediately
|
||||||
queue.put(wal);
|
queue.put(wal);
|
||||||
queues.put(logPrefix, queue);
|
queues.put(walPrefix, queue);
|
||||||
if (this.isSourceActive() && this.walEntryFilter != null) {
|
if (this.isSourceActive() && this.walEntryFilter != null) {
|
||||||
// new wal group observed after source startup, start a new worker thread to track it
|
// new wal group observed after source startup, start a new worker thread to track it
|
||||||
// notice: it's possible that wal enqueued when this.running is set but worker thread
|
// notice: it's possible that wal enqueued when this.running is set but worker thread
|
||||||
// still not launched, so it's necessary to check workerThreads before start the worker
|
// still not launched, so it's necessary to check workerThreads before start the worker
|
||||||
tryStartNewShipper(logPrefix, queue);
|
tryStartNewShipper(walPrefix, queue);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
queue.put(wal);
|
queue.put(wal);
|
||||||
}
|
}
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), logPrefix,
|
LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix,
|
||||||
this.replicationQueueInfo.getQueueId());
|
this.replicationQueueInfo.getQueueId());
|
||||||
}
|
}
|
||||||
this.metrics.incrSizeOfLogQueue();
|
this.metrics.incrSizeOfLogQueue();
|
||||||
// This will wal a warning for each new wal that gets created above the warn threshold
|
// This will wal a warning for each new wal that gets created above the warn threshold
|
||||||
int queueSize = queue.size();
|
int queueSize = queue.size();
|
||||||
if (queueSize > this.logQueueWarnThreshold) {
|
if (queueSize > this.logQueueWarnThreshold) {
|
||||||
LOG.warn("{} WAL group {} queue size: {} exceeds value of "
|
LOG.warn("{} WAL group {} queue size: {} exceeds value of " +
|
||||||
+ "replication.source.log.queue.warn: {}", logPeerId(),
|
"replication.source.log.queue.warn {}", logPeerId(), walPrefix, queueSize,
|
||||||
logPrefix, queueSize, logQueueWarnThreshold);
|
logQueueWarnThreshold);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -372,16 +372,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
|
private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
|
||||||
workerThreads.compute(walGroupId, (key, value) -> {
|
workerThreads.compute(walGroupId, (key, value) -> {
|
||||||
if (value != null) {
|
if (value != null) {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("{} preempted start of worker walGroupId={}", logPeerId(), walGroupId);
|
||||||
LOG.debug(
|
|
||||||
"{} Someone has beat us to start a worker thread for wal group {}",
|
|
||||||
logPeerId(), key);
|
|
||||||
}
|
|
||||||
return value;
|
return value;
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("{} starting worker for walGroupId={}", logPeerId(), walGroupId);
|
||||||
LOG.debug("{} Starting up worker for wal group {}", logPeerId(), key);
|
|
||||||
}
|
|
||||||
ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
|
ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
|
||||||
ReplicationSourceWALReader walReader =
|
ReplicationSourceWALReader walReader =
|
||||||
createNewWALReader(walGroupId, queue, worker.getStartPosition());
|
createNewWALReader(walGroupId, queue, worker.getStartPosition());
|
||||||
|
@ -457,8 +451,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Call after {@link #initializeWALEntryFilter(UUID)} else it will be null.
|
* Call after {@link #initializeWALEntryFilter(UUID)} else it will be null.
|
||||||
* @return The WAL Entry Filter Chain this ReplicationSource will use on WAL files filtering
|
* @return WAL Entry Filter Chain to use on WAL files filtering *out* WALEntry edits.
|
||||||
* out WALEntry edits.
|
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
WALEntryFilter getWalEntryFilter() {
|
WALEntryFilter getWalEntryFilter() {
|
||||||
|
@ -610,7 +603,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
this.startupOngoing.set(false);
|
this.startupOngoing.set(false);
|
||||||
throw new IllegalStateException("Source should be active.");
|
throw new IllegalStateException("Source should be active.");
|
||||||
}
|
}
|
||||||
LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};",
|
LOG.info("{} queueId={} is replicating from cluster={} to cluster={}",
|
||||||
logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
|
logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
|
||||||
|
|
||||||
initializeWALEntryFilter(peerClusterId);
|
initializeWALEntryFilter(peerClusterId);
|
||||||
|
@ -625,10 +618,13 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void startup() {
|
public void startup() {
|
||||||
|
if (this.sourceRunning) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.sourceRunning = true;
|
||||||
//Flag that signalizes uncaught error happening while starting up the source
|
//Flag that signalizes uncaught error happening while starting up the source
|
||||||
// and a retry should be attempted
|
// and a retry should be attempted
|
||||||
MutableBoolean retryStartup = new MutableBoolean(true);
|
MutableBoolean retryStartup = new MutableBoolean(true);
|
||||||
this.sourceRunning = true;
|
|
||||||
do {
|
do {
|
||||||
if(retryStartup.booleanValue()) {
|
if(retryStartup.booleanValue()) {
|
||||||
retryStartup.setValue(false);
|
retryStartup.setValue(false);
|
||||||
|
@ -661,7 +657,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
terminate(reason, cause, clearMetrics, true);
|
terminate(reason, cause, clearMetrics, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) {
|
public void terminate(String reason, Exception cause, boolean clearMetrics,
|
||||||
|
boolean join) {
|
||||||
if (cause == null) {
|
if (cause == null) {
|
||||||
LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
|
LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
|
||||||
} else {
|
} else {
|
||||||
|
@ -855,6 +852,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
}
|
}
|
||||||
|
|
||||||
private String logPeerId(){
|
private String logPeerId(){
|
||||||
return "[Source for peer " + this.getPeer().getId() + "]:";
|
return "peerId=" + this.getPeerId() + ",";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -174,7 +174,7 @@ class WALEntryStream implements Closeable {
|
||||||
private void tryAdvanceEntry() throws IOException {
|
private void tryAdvanceEntry() throws IOException {
|
||||||
if (checkReader()) {
|
if (checkReader()) {
|
||||||
boolean beingWritten = readNextEntryAndRecordReaderPosition();
|
boolean beingWritten = readNextEntryAndRecordReaderPosition();
|
||||||
LOG.trace("reading wal file {}. Current open for write: {}", this.currentPath, beingWritten);
|
LOG.trace("Reading WAL {}; currently open for write={}", this.currentPath, beingWritten);
|
||||||
if (currentEntry == null && !beingWritten) {
|
if (currentEntry == null && !beingWritten) {
|
||||||
// no more entries in this log file, and the file is already closed, i.e, rolled
|
// no more entries in this log file, and the file is already closed, i.e, rolled
|
||||||
// Before dequeueing, we should always get one more attempt at reading.
|
// Before dequeueing, we should always get one more attempt at reading.
|
||||||
|
@ -222,7 +222,7 @@ class WALEntryStream implements Closeable {
|
||||||
if (currentPositionOfReader < stat.getLen()) {
|
if (currentPositionOfReader < stat.getLen()) {
|
||||||
final long skippedBytes = stat.getLen() - currentPositionOfReader;
|
final long skippedBytes = stat.getLen() - currentPositionOfReader;
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
"Reached the end of WAL file '{}'. It was not closed cleanly," +
|
"Reached the end of WAL {}. It was not closed cleanly," +
|
||||||
" so we did not parse {} bytes of data. This is normally ok.",
|
" so we did not parse {} bytes of data. This is normally ok.",
|
||||||
currentPath, skippedBytes);
|
currentPath, skippedBytes);
|
||||||
metrics.incrUncleanlyClosedWALs();
|
metrics.incrUncleanlyClosedWALs();
|
||||||
|
@ -230,7 +230,7 @@ class WALEntryStream implements Closeable {
|
||||||
}
|
}
|
||||||
} else if (currentPositionOfReader + trailerSize < stat.getLen()) {
|
} else if (currentPositionOfReader + trailerSize < stat.getLen()) {
|
||||||
LOG.warn(
|
LOG.warn(
|
||||||
"Processing end of WAL file '{}'. At position {}, which is too far away from" +
|
"Processing end of WAL {} at position {}, which is too far away from" +
|
||||||
" reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
|
" reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
|
||||||
currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat());
|
currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat());
|
||||||
setPosition(0);
|
setPosition(0);
|
||||||
|
@ -241,7 +241,7 @@ class WALEntryStream implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " +
|
LOG.trace("Reached the end of " + this.currentPath + " and length of the file is " +
|
||||||
(stat == null ? "N/A" : stat.getLen()));
|
(stat == null ? "N/A" : stat.getLen()));
|
||||||
}
|
}
|
||||||
metrics.incrCompletedWAL();
|
metrics.incrCompletedWAL();
|
||||||
|
@ -249,7 +249,7 @@ class WALEntryStream implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void dequeueCurrentLog() throws IOException {
|
private void dequeueCurrentLog() throws IOException {
|
||||||
LOG.debug("Reached the end of log {}", currentPath);
|
LOG.debug("EOF, closing {}", currentPath);
|
||||||
closeReader();
|
closeReader();
|
||||||
logQueue.remove();
|
logQueue.remove();
|
||||||
setPosition(0);
|
setPosition(0);
|
||||||
|
@ -264,7 +264,7 @@ class WALEntryStream implements Closeable {
|
||||||
long readerPos = reader.getPosition();
|
long readerPos = reader.getPosition();
|
||||||
OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
|
OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
|
||||||
if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
|
if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
|
||||||
// see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
|
// See HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
|
||||||
// data, so we need to make sure that we do not read beyond the committed file length.
|
// data, so we need to make sure that we do not read beyond the committed file length.
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("The provider tells us the valid length for " + currentPath + " is " +
|
LOG.debug("The provider tells us the valid length for " + currentPath + " is " +
|
||||||
|
|
|
@ -221,7 +221,7 @@ public final class MetaTableLocator {
|
||||||
LOG.warn("Tried to set null ServerName in hbase:meta; skipping -- ServerName required");
|
LOG.warn("Tried to set null ServerName in hbase:meta; skipping -- ServerName required");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
LOG.info("Setting hbase:meta (replicaId={}) location in ZooKeeper as {}, state={}", replicaId,
|
LOG.info("Setting hbase:meta replicaId={} location in ZooKeeper as {}, state={}", replicaId,
|
||||||
serverName, state);
|
serverName, state);
|
||||||
// Make the MetaRegionServer pb and then get its bytes and save this as
|
// Make the MetaRegionServer pb and then get its bytes and save this as
|
||||||
// the znode content.
|
// the znode content.
|
||||||
|
@ -235,9 +235,9 @@ public final class MetaTableLocator {
|
||||||
zookeeper.getZNodePaths().getZNodeForReplica(replicaId), data);
|
zookeeper.getZNodePaths().getZNodeForReplica(replicaId), data);
|
||||||
} catch(KeeperException.NoNodeException nne) {
|
} catch(KeeperException.NoNodeException nne) {
|
||||||
if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) {
|
if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) {
|
||||||
LOG.debug("META region location doesn't exist, create it");
|
LOG.debug("hbase:meta region location doesn't exist, create it");
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("META region location doesn't exist for replicaId=" + replicaId +
|
LOG.debug("hbase:meta region location doesn't exist for replicaId=" + replicaId +
|
||||||
", create it");
|
", create it");
|
||||||
}
|
}
|
||||||
ZKUtil.createAndWatch(zookeeper, zookeeper.getZNodePaths().getZNodeForReplica(replicaId),
|
ZKUtil.createAndWatch(zookeeper, zookeeper.getZNodePaths().getZNodeForReplica(replicaId),
|
||||||
|
|
Loading…
Reference in New Issue