HBASE-25067 Edit of log messages around async WAL Replication; checkstyle fixes; and a bugfix

Editing logging around region replicas: shortening and adding context.
Checkstyle fixes in edited files while I was in there.
Bug fix in AssignRegionHandler -- was using M_RS_CLOSE_META to open
a Region instead of a M_RS_OPEN_META.

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
stack 2020-09-18 12:36:23 -07:00
parent d638ec26ff
commit 97979436f8
12 changed files with 84 additions and 83 deletions

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -49,7 +49,7 @@ public class ReplicationPeerImpl implements ReplicationPeer {
ReplicationPeerConfig peerConfig) {
this.conf = conf;
this.id = id;
this.peerState = peerState ? PeerState.ENABLED : PeerState.DISABLED;
setPeerState(peerState);
this.peerConfig = peerConfig;
this.peerConfigListeners = new ArrayList<>();
}

View File

@ -1765,7 +1765,7 @@ public class HMaster extends HRegionServer implements MasterServices {
toPrint = regionsInTransition.subList(0, max);
truncated = true;
}
LOG.info(prefix + "unning balancer because " + regionsInTransition.size() +
LOG.info(prefix + " not running balancer because " + regionsInTransition.size() +
" region(s) in transition: " + toPrint + (truncated? "(truncated list)": ""));
if (!force || metaInTransition) return false;
}

View File

@ -141,9 +141,9 @@ public class EnableTableProcedure
}
} else {
// the replicasFound is less than the regionReplication
LOG.info("Number of replicas has increased. Assigning new region replicas." +
LOG.info("Number of replicas has increased for {}. Assigning new region replicas." +
"The previous replica count was {}. The current replica count is {}.",
(currentMaxReplica + 1), configuredReplicaCount);
this.tableName, (currentMaxReplica + 1), configuredReplicaCount);
regionsOfTable = RegionReplicaUtil.addReplicas(regionsOfTable,
currentMaxReplica + 1, configuredReplicaCount);
}

View File

@ -2408,11 +2408,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
status.setStatus("Acquiring readlock on region");
// block waiting for the lock for flushing cache
lock.readLock().lock();
boolean flushed = true;
try {
if (this.closed.get()) {
String msg = "Skipping flush on " + this + " because closed";
LOG.debug(msg);
status.abort(msg);
flushed = false;
return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
}
if (coprocessorHost != null) {
@ -2429,15 +2431,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (!writestate.flushing && writestate.writesEnabled) {
this.writestate.flushing = true;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("NOT flushing memstore for region " + this
+ ", flushing=" + writestate.flushing + ", writesEnabled="
+ writestate.writesEnabled);
}
String msg = "Not flushing since "
+ (writestate.flushing ? "already flushing"
: "writes not enabled");
String msg = "NOT flushing " + this + " as " + (writestate.flushing ? "already flushing"
: "writes are not enabled");
LOG.debug(msg);
status.abort(msg);
flushed = false;
return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
}
}
@ -2475,8 +2473,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
} finally {
lock.readLock().unlock();
if (flushed) {
// Don't log this journal stuff if no flush -- confusing.
LOG.debug("Flush status journal for {}:\n{}", this.getRegionInfo().getEncodedName(),
status.prettyPrintJournal());
}
status.cleanup();
}
}
@ -5003,7 +5004,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public void setReadsEnabled(boolean readsEnabled) {
if (readsEnabled && !this.writestate.readsEnabled) {
LOG.info(getRegionInfo().getEncodedName() + " : Enabling reads for region.");
LOG.info("Enabling reads for {}", getRegionInfo().getEncodedName());
}
this.writestate.setReadsEnabled(readsEnabled);
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -119,10 +119,11 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.mob.MobFileCache;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
@ -139,8 +140,6 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
@ -161,7 +160,6 @@ import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.JvmPauseMonitor;
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
import org.apache.hadoop.hbase.util.Pair;
@ -2224,9 +2222,9 @@ public class HRegionServer extends Thread implements
this.walRoller.addWAL(wal);
}
return wal;
}catch (FailedCloseWALAfterInitializedErrorException ex) {
} catch (FailedCloseWALAfterInitializedErrorException ex) {
// see HBASE-21751 for details
abort("wal can not clean up after init failed", ex);
abort("WAL can not clean up after init failed", ex);
throw ex;
}
}
@ -2467,10 +2465,13 @@ public class HRegionServer extends Thread implements
region.setReadsEnabled(false); // disable reads before marking the region as opened.
// RegionReplicaFlushHandler might reset this.
// submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
// Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
if (this.executorService != null) {
this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection,
rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
} else {
LOG.info("Executor is null; not running flush of primary region replica for {}",
region.getRegionInfo());
}
}

View File

@ -92,7 +92,7 @@ public class AssignRegionHandler extends EventHandler {
String regionName = regionInfo.getRegionNameAsString();
Region onlineRegion = rs.getRegion(encodedName);
if (onlineRegion != null) {
LOG.warn("Received OPEN for the region:{}, which is already online", regionName);
LOG.warn("Received OPEN for {} which is already online", regionName);
// Just follow the old behavior, do we need to call reportRegionStateTransition? Maybe not?
// For normal case, it could happen that the rpc call to schedule this handler is succeeded,
// but before returning to master the connection is broken. And when master tries again, we
@ -104,7 +104,7 @@ public class AssignRegionHandler extends EventHandler {
if (previous != null) {
if (previous) {
// The region is opening and this maybe a retry on the rpc call, it is safe to ignore it.
LOG.info("Receiving OPEN for the region:{}, which we are already trying to OPEN" +
LOG.info("Receiving OPEN for {} which we are already trying to OPEN" +
" - ignoring this new request for this region.", regionName);
} else {
// The region is closing. This is possible as we will update the region state to CLOSED when
@ -113,7 +113,7 @@ public class AssignRegionHandler extends EventHandler {
// closing process.
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.info(
"Receiving OPEN for the region:{}, which we are trying to close, try again after {}ms",
"Receiving OPEN for {} which we are trying to close, try again after {}ms",
regionName, backoff);
rs.getExecutorService().delayedSubmit(this, backoff, TimeUnit.MILLISECONDS);
}
@ -145,11 +145,10 @@ public class AssignRegionHandler extends EventHandler {
Boolean current = rs.getRegionsInTransitionInRS().remove(regionInfo.getEncodedNameAsBytes());
if (current == null) {
// Should NEVER happen, but let's be paranoid.
LOG.error("Bad state: we've just opened a region that was NOT in transition. Region={}",
regionName);
LOG.error("Bad state: we've just opened {} which was NOT in transition", regionName);
} else if (!current) {
// Should NEVER happen, but let's be paranoid.
LOG.error("Bad state: we've just opened a region that was closing. Region={}", regionName);
LOG.error("Bad state: we've just opened {} which was closing", regionName);
}
}
@ -168,7 +167,7 @@ public class AssignRegionHandler extends EventHandler {
long openProcId, TableDescriptor tableDesc, long masterSystemTime) {
EventType eventType;
if (regionInfo.isMetaRegion()) {
eventType = EventType.M_RS_CLOSE_META;
eventType = EventType.M_RS_OPEN_META;
} else if (regionInfo.getTable().isSystemTable() ||
(tableDesc != null && tableDesc.getPriority() >= HConstants.ADMIN_QOS)) {
eventType = EventType.M_RS_OPEN_PRIORITY_REGION;

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver.handler;
import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
@ -42,9 +41,9 @@ import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
/**
* HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to wal in
* HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to WAL in
* secondary region replicas. This means that a secondary region replica can serve some edits from
* it's memstore that that is still not flushed from primary. We do not want to allow secondary
* it's memstore that are still not flushed from primary. We do not want to allow secondary
* region's seqId to go back in time, when this secondary region is opened elsewhere after a
* crash or region move. We will trigger a flush cache in the primary region replica and wait
* for observing a complete flush cycle before marking the region readsEnabled. This handler does
@ -53,7 +52,6 @@ import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
*/
@InterfaceAudience.Private
public class RegionReplicaFlushHandler extends EventHandler {
private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class);
private final ClusterConnection connection;
@ -83,7 +81,7 @@ public class RegionReplicaFlushHandler extends EventHandler {
if (t instanceof InterruptedIOException || t instanceof InterruptedException) {
LOG.error("Caught throwable while processing event " + eventType, t);
} else if (t instanceof RuntimeException) {
server.abort("ServerAborting because a runtime exception was thrown", t);
server.abort("Server aborting", t);
} else {
// something fishy since we cannot flush the primary region until all retries (retries from
// rpc times 35 trigger). We cannot close the region since there is no such mechanism to
@ -111,9 +109,9 @@ public class RegionReplicaFlushHandler extends EventHandler {
RetryCounter counter = new RetryCounterFactory(maxAttempts, (int)pause).create();
if (LOG.isDebugEnabled()) {
LOG.debug("Attempting to do an RPC to the primary region replica " + ServerRegionReplicaUtil
.getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() + " of region "
+ region.getRegionInfo().getEncodedName() + " to trigger a flush");
LOG.debug("RPC'ing to primary region replica " +
ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) + " from " +
region.getRegionInfo() + " to trigger FLUSH");
}
while (!region.isClosing() && !region.isClosed()
&& !server.isAborted() && !server.isStopped()) {
@ -139,11 +137,11 @@ public class RegionReplicaFlushHandler extends EventHandler {
// then we have to wait for seeing the flush entry. All reads will be rejected until we see
// a complete flush cycle or replay a region open event
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully triggered a flush of primary region replica "
LOG.debug("Triggered flush of primary region replica "
+ ServerRegionReplicaUtil
.getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
+ " of region " + region.getRegionInfo().getEncodedName()
+ " Now waiting and blocking reads until observing a full flush cycle");
+ " for " + region.getRegionInfo().getEncodedName()
+ "; now waiting and blocking reads until completes a full flush cycle");
}
region.setReadsEnabled(true);
break;
@ -151,10 +149,10 @@ public class RegionReplicaFlushHandler extends EventHandler {
if (response.hasWroteFlushWalMarker()) {
if(response.getWroteFlushWalMarker()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary "
LOG.debug("Triggered empty flush marker (memstore empty) on primary "
+ "region replica " + ServerRegionReplicaUtil
.getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
+ " of region " + region.getRegionInfo().getEncodedName() + " Now waiting and "
+ " for " + region.getRegionInfo().getEncodedName() + "; now waiting and "
+ "blocking reads until observing a flush marker");
}
region.setReadsEnabled(true);

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -84,19 +84,18 @@ public class UnassignRegionHandler extends EventHandler {
// reportRegionStateTransition, so the HMaster will think the region is online, before we
// actually open the region, as reportRegionStateTransition is part of the opening process.
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn("Received CLOSE for the region: {}, which we are already " +
"trying to OPEN. try again after {}ms", encodedName, backoff);
LOG.warn("Received CLOSE for {} which we are already " +
"trying to OPEN; try again after {}ms", encodedName, backoff);
rs.getExecutorService().delayedSubmit(this, backoff, TimeUnit.MILLISECONDS);
} else {
LOG.info("Received CLOSE for the region: {}, which we are already trying to CLOSE," +
LOG.info("Received CLOSE for {} which we are already trying to CLOSE," +
" but not completed yet", encodedName);
}
return;
}
HRegion region = rs.getRegion(encodedName);
if (region == null) {
LOG.debug(
"Received CLOSE for a region {} which is not online, and we're not opening/closing.",
LOG.debug("Received CLOSE for {} which is not ONLINE and we're not opening/closing.",
encodedName);
rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
return;
@ -114,10 +113,11 @@ public class UnassignRegionHandler extends EventHandler {
if (region.close(abort) == null) {
// XXX: Is this still possible? The old comment says about split, but now split is done at
// master side, so...
LOG.warn("Can't close region {}, was already closed during close()", regionName);
LOG.warn("Can't close {} already closed during close()", regionName);
rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
return;
}
rs.removeRegion(region, destination);
if (!rs.reportRegionStateTransition(
new RegionStateTransitionContext(TransitionCode.CLOSED, HConstants.NO_SEQNUM, closeProcId,

View File

@ -412,14 +412,14 @@ public class ProtobufLogReader extends ReaderBase {
+ "because originalPosition is negative. last offset={}", this.inputStream.getPos(), eof);
throw eof;
}
// If stuck at the same place and we got and exception, lets go back at the beginning.
// If stuck at the same place and we got an exception, lets go back at the beginning.
if (inputStream.getPos() == originalPosition) {
if (resetPosition) {
LOG.warn("Encountered a malformed edit, seeking to the beginning of the WAL since "
+ "current position and original position match at {}", originalPosition);
seekOnFs(0);
} else {
LOG.debug("Reached the end of file at position {}", originalPosition);
LOG.debug("EOF at position {}", originalPosition);
}
} else {
// Else restore our position to original location in hope that next time through we will

View File

@ -237,21 +237,22 @@ public class ReplicationSource implements ReplicationSourceInterface {
LOG.trace("NOT replicating {}", wal);
return;
}
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
// Use WAL prefix as the WALGroupId for this peer.
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
PriorityBlockingQueue<Path> queue = queues.get(walPrefix);
if (queue == null) {
queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
queues.put(logPrefix, queue);
queues.put(walPrefix, queue);
if (this.isSourceActive() && this.walEntryFilter != null) {
// new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that wal enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker
tryStartNewShipper(logPrefix, queue);
tryStartNewShipper(walPrefix, queue);
}
}
queue.put(wal);
if (LOG.isTraceEnabled()) {
LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), logPrefix,
LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix,
this.replicationQueueInfo.getQueueId());
}
this.metrics.incrSizeOfLogQueue();
@ -260,7 +261,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
if (queueSize > this.logQueueWarnThreshold) {
LOG.warn("{} WAL group {} queue size: {} exceeds value of "
+ "replication.source.log.queue.warn: {}", logPeerId(),
logPrefix, queueSize, logQueueWarnThreshold);
walPrefix, queueSize, logQueueWarnThreshold);
}
}
@ -357,14 +358,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
if(LOG.isDebugEnabled()) {
LOG.debug("{} Someone has beat us to start a worker thread for wal group {}", logPeerId(),
walGroupId);
}
LOG.debug("{} preempted start of worker walGroupId={}", logPeerId(), walGroupId);
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("{} Starting up worker for wal group {}", logPeerId(), walGroupId);
}
LOG.debug("{} starting worker for walGroupId={}", logPeerId(), walGroupId);
ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, queue, worker.getStartPosition());
Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() +
@ -435,8 +431,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
/**
* Call after {@link #initializeWALEntryFilter(UUID)} else it will be null.
* @return The WAL Entry Filter Chain this ReplicationSource will use on WAL files filtering
* out WALEntry edits.
* @return WAL Entry Filter Chain to use on WAL files filtering *out* WALEntry edits.
*/
@VisibleForTesting
WALEntryFilter getWalEntryFilter() {
@ -575,7 +570,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
if (!this.isSourceActive()) {
return;
}
LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};",
LOG.info("{} queueId={} is replicating from cluster={} to cluster={}",
logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
initializeWALEntryFilter(peerClusterId);
@ -589,7 +584,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
@Override
public void startup() {
// mark we are running now
if (this.sourceRunning) {
return;
}
// Mark we are running now
this.sourceRunning = true;
initThread = new Thread(this::initialize);
Threads.setDaemonThreadRunning(initThread,
@ -612,7 +610,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
terminate(reason, cause, clearMetrics, true);
}
public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) {
public void terminate(String reason, Exception cause, boolean clearMetrics,
boolean join) {
if (cause == null) {
LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
} else {
@ -798,7 +797,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
return queueStorage;
}
/**
* @return String to use as a log prefix that contains current peerId.
*/
private String logPeerId(){
return "[Source for peer " + this.getPeerId() + "]:";
return "peerId=" + this.getPeerId() + ",";
}
}

View File

@ -174,7 +174,7 @@ class WALEntryStream implements Closeable {
private void tryAdvanceEntry() throws IOException {
if (checkReader()) {
boolean beingWritten = readNextEntryAndRecordReaderPosition();
LOG.trace("reading wal file {}. Current open for write: {}", this.currentPath, beingWritten);
LOG.trace("Reading WAL {}; currently open for write={}", this.currentPath, beingWritten);
if (currentEntry == null && !beingWritten) {
// no more entries in this log file, and the file is already closed, i.e, rolled
// Before dequeueing, we should always get one more attempt at reading.
@ -222,7 +222,7 @@ class WALEntryStream implements Closeable {
if (currentPositionOfReader < stat.getLen()) {
final long skippedBytes = stat.getLen() - currentPositionOfReader;
LOG.debug(
"Reached the end of WAL file '{}'. It was not closed cleanly," +
"Reached the end of WAL {}. It was not closed cleanly," +
" so we did not parse {} bytes of data. This is normally ok.",
currentPath, skippedBytes);
metrics.incrUncleanlyClosedWALs();
@ -230,7 +230,7 @@ class WALEntryStream implements Closeable {
}
} else if (currentPositionOfReader + trailerSize < stat.getLen()) {
LOG.warn(
"Processing end of WAL file '{}'. At position {}, which is too far away from" +
"Processing end of WAL {} at position {}, which is too far away from" +
" reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat());
setPosition(0);
@ -241,7 +241,7 @@ class WALEntryStream implements Closeable {
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " +
LOG.trace("Reached the end of " + this.currentPath + " and length of the file is " +
(stat == null ? "N/A" : stat.getLen()));
}
metrics.incrCompletedWAL();
@ -249,7 +249,7 @@ class WALEntryStream implements Closeable {
}
private void dequeueCurrentLog() throws IOException {
LOG.debug("Reached the end of log {}", currentPath);
LOG.debug("EOF, closing {}", currentPath);
closeReader();
logQueue.remove();
setPosition(0);
@ -264,7 +264,7 @@ class WALEntryStream implements Closeable {
long readerPos = reader.getPosition();
OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
// see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
// See HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
// data, so we need to make sure that we do not read beyond the committed file length.
if (LOG.isDebugEnabled()) {
LOG.debug("The provider tells us the valid length for " + currentPath + " is " +

View File

@ -221,7 +221,7 @@ public final class MetaTableLocator {
LOG.warn("Tried to set null ServerName in hbase:meta; skipping -- ServerName required");
return;
}
LOG.info("Setting hbase:meta (replicaId={}) location in ZooKeeper as {}, state={}", replicaId,
LOG.info("Setting hbase:meta replicaId={} location in ZooKeeper as {}, state={}", replicaId,
serverName, state);
// Make the MetaRegionServer pb and then get its bytes and save this as
// the znode content.
@ -235,9 +235,9 @@ public final class MetaTableLocator {
zookeeper.getZNodePaths().getZNodeForReplica(replicaId), data);
} catch(KeeperException.NoNodeException nne) {
if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) {
LOG.debug("META region location doesn't exist, create it");
LOG.debug("hbase:meta region location doesn't exist, create it");
} else {
LOG.debug("META region location doesn't exist for replicaId=" + replicaId +
LOG.debug("hbase:meta region location doesn't exist for replicaId=" + replicaId +
", create it");
}
ZKUtil.createAndWatch(zookeeper, zookeeper.getZNodePaths().getZNodeForReplica(replicaId),