diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java index d6564665721..5e33aa90b27 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -49,7 +49,7 @@ public class ReplicationPeerImpl implements ReplicationPeer { ReplicationPeerConfig peerConfig) { this.conf = conf; this.id = id; - this.peerState = peerState ? PeerState.ENABLED : PeerState.DISABLED; + setPeerState(peerState); this.peerConfig = peerConfig; this.peerConfigListeners = new ArrayList<>(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index ffd5774834c..d0425889039 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1765,7 +1765,7 @@ public class HMaster extends HRegionServer implements MasterServices { toPrint = regionsInTransition.subList(0, max); truncated = true; } - LOG.info(prefix + "unning balancer because " + regionsInTransition.size() + + LOG.info(prefix + " not running balancer because " + regionsInTransition.size() + " region(s) in transition: " + toPrint + (truncated? "(truncated list)": "")); if (!force || metaInTransition) return false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java index 99d5a274929..4ac887f40e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java @@ -141,9 +141,9 @@ public class EnableTableProcedure } } else { // the replicasFound is less than the regionReplication - LOG.info("Number of replicas has increased. Assigning new region replicas." + + LOG.info("Number of replicas has increased for {}. Assigning new region replicas." + "The previous replica count was {}. The current replica count is {}.", - (currentMaxReplica + 1), configuredReplicaCount); + this.tableName, (currentMaxReplica + 1), configuredReplicaCount); regionsOfTable = RegionReplicaUtil.addReplicas(regionsOfTable, currentMaxReplica + 1, configuredReplicaCount); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9babc65f88e..9537dda9b2f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2408,11 +2408,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi status.setStatus("Acquiring readlock on region"); // block waiting for the lock for flushing cache lock.readLock().lock(); + boolean flushed = true; try { if (this.closed.get()) { String msg = "Skipping flush on " + this + " because closed"; LOG.debug(msg); status.abort(msg); + flushed = false; return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); } if (coprocessorHost != null) { @@ -2429,15 +2431,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!writestate.flushing && writestate.writesEnabled) { this.writestate.flushing = true; } else { - if (LOG.isDebugEnabled()) { - LOG.debug("NOT flushing memstore for region " + this - + ", flushing=" + writestate.flushing + ", writesEnabled=" - + writestate.writesEnabled); - } - String msg = "Not flushing since " - + (writestate.flushing ? "already flushing" - : "writes not enabled"); + String msg = "NOT flushing " + this + " as " + (writestate.flushing ? "already flushing" + : "writes are not enabled"); + LOG.debug(msg); status.abort(msg); + flushed = false; return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); } } @@ -2475,8 +2473,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } finally { lock.readLock().unlock(); - LOG.debug("Flush status journal for {}:\n{}", this.getRegionInfo().getEncodedName(), - status.prettyPrintJournal()); + if (flushed) { + // Don't log this journal stuff if no flush -- confusing. + LOG.debug("Flush status journal for {}:\n{}", this.getRegionInfo().getEncodedName(), + status.prettyPrintJournal()); + } status.cleanup(); } } @@ -5003,7 +5004,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public void setReadsEnabled(boolean readsEnabled) { if (readsEnabled && !this.writestate.readsEnabled) { - LOG.info(getRegionInfo().getEncodedName() + " : Enabling reads for region."); + LOG.info("Enabling reads for {}", getRegionInfo().getEncodedName()); } this.writestate.setReadsEnabled(readsEnabled); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 11d65599f54..a80881e5689 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -119,10 +119,11 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.LoadBalancer; -import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.mob.MobFileCache; +import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; +import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore; import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore; @@ -139,8 +140,6 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; -import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; -import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore; import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; @@ -161,7 +160,6 @@ import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.JvmPauseMonitor; import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; import org.apache.hadoop.hbase.util.Pair; @@ -2224,9 +2222,9 @@ public class HRegionServer extends Thread implements this.walRoller.addWAL(wal); } return wal; - }catch (FailedCloseWALAfterInitializedErrorException ex) { + } catch (FailedCloseWALAfterInitializedErrorException ex) { // see HBASE-21751 for details - abort("wal can not clean up after init failed", ex); + abort("WAL can not clean up after init failed", ex); throw ex; } } @@ -2467,10 +2465,13 @@ public class HRegionServer extends Thread implements region.setReadsEnabled(false); // disable reads before marking the region as opened. // RegionReplicaFlushHandler might reset this. - // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler + // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler if (this.executorService != null) { this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection, - rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region)); + rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region)); + } else { + LOG.info("Executor is null; not running flush of primary region replica for {}", + region.getRegionInfo()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java index 3474bf916be..98d09b20e87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java @@ -92,7 +92,7 @@ public class AssignRegionHandler extends EventHandler { String regionName = regionInfo.getRegionNameAsString(); Region onlineRegion = rs.getRegion(encodedName); if (onlineRegion != null) { - LOG.warn("Received OPEN for the region:{}, which is already online", regionName); + LOG.warn("Received OPEN for {} which is already online", regionName); // Just follow the old behavior, do we need to call reportRegionStateTransition? Maybe not? // For normal case, it could happen that the rpc call to schedule this handler is succeeded, // but before returning to master the connection is broken. And when master tries again, we @@ -104,7 +104,7 @@ public class AssignRegionHandler extends EventHandler { if (previous != null) { if (previous) { // The region is opening and this maybe a retry on the rpc call, it is safe to ignore it. - LOG.info("Receiving OPEN for the region:{}, which we are already trying to OPEN" + + LOG.info("Receiving OPEN for {} which we are already trying to OPEN" + " - ignoring this new request for this region.", regionName); } else { // The region is closing. This is possible as we will update the region state to CLOSED when @@ -113,7 +113,7 @@ public class AssignRegionHandler extends EventHandler { // closing process. long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); LOG.info( - "Receiving OPEN for the region:{}, which we are trying to close, try again after {}ms", + "Receiving OPEN for {} which we are trying to close, try again after {}ms", regionName, backoff); rs.getExecutorService().delayedSubmit(this, backoff, TimeUnit.MILLISECONDS); } @@ -145,11 +145,10 @@ public class AssignRegionHandler extends EventHandler { Boolean current = rs.getRegionsInTransitionInRS().remove(regionInfo.getEncodedNameAsBytes()); if (current == null) { // Should NEVER happen, but let's be paranoid. - LOG.error("Bad state: we've just opened a region that was NOT in transition. Region={}", - regionName); + LOG.error("Bad state: we've just opened {} which was NOT in transition", regionName); } else if (!current) { // Should NEVER happen, but let's be paranoid. - LOG.error("Bad state: we've just opened a region that was closing. Region={}", regionName); + LOG.error("Bad state: we've just opened {} which was closing", regionName); } } @@ -168,7 +167,7 @@ public class AssignRegionHandler extends EventHandler { long openProcId, TableDescriptor tableDesc, long masterSystemTime) { EventType eventType; if (regionInfo.isMetaRegion()) { - eventType = EventType.M_RS_CLOSE_META; + eventType = EventType.M_RS_OPEN_META; } else if (regionInfo.getTable().isSystemTable() || (tableDesc != null && tableDesc.getPriority() >= HConstants.ADMIN_QOS)) { eventType = EventType.M_RS_OPEN_PRIORITY_REGION; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java index 81b6d7e0a9f..a20443ce646 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver.handler; import java.io.IOException; import java.io.InterruptedIOException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; @@ -42,9 +41,9 @@ import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; /** - * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to wal in + * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to WAL in * secondary region replicas. This means that a secondary region replica can serve some edits from - * it's memstore that that is still not flushed from primary. We do not want to allow secondary + * it's memstore that are still not flushed from primary. We do not want to allow secondary * region's seqId to go back in time, when this secondary region is opened elsewhere after a * crash or region move. We will trigger a flush cache in the primary region replica and wait * for observing a complete flush cycle before marking the region readsEnabled. This handler does @@ -53,7 +52,6 @@ import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; */ @InterfaceAudience.Private public class RegionReplicaFlushHandler extends EventHandler { - private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class); private final ClusterConnection connection; @@ -83,7 +81,7 @@ public class RegionReplicaFlushHandler extends EventHandler { if (t instanceof InterruptedIOException || t instanceof InterruptedException) { LOG.error("Caught throwable while processing event " + eventType, t); } else if (t instanceof RuntimeException) { - server.abort("ServerAborting because a runtime exception was thrown", t); + server.abort("Server aborting", t); } else { // something fishy since we cannot flush the primary region until all retries (retries from // rpc times 35 trigger). We cannot close the region since there is no such mechanism to @@ -111,9 +109,9 @@ public class RegionReplicaFlushHandler extends EventHandler { RetryCounter counter = new RetryCounterFactory(maxAttempts, (int)pause).create(); if (LOG.isDebugEnabled()) { - LOG.debug("Attempting to do an RPC to the primary region replica " + ServerRegionReplicaUtil - .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() + " of region " - + region.getRegionInfo().getEncodedName() + " to trigger a flush"); + LOG.debug("RPC'ing to primary region replica " + + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) + " from " + + region.getRegionInfo() + " to trigger FLUSH"); } while (!region.isClosing() && !region.isClosed() && !server.isAborted() && !server.isStopped()) { @@ -139,11 +137,11 @@ public class RegionReplicaFlushHandler extends EventHandler { // then we have to wait for seeing the flush entry. All reads will be rejected until we see // a complete flush cycle or replay a region open event if (LOG.isDebugEnabled()) { - LOG.debug("Successfully triggered a flush of primary region replica " + LOG.debug("Triggered flush of primary region replica " + ServerRegionReplicaUtil .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() - + " of region " + region.getRegionInfo().getEncodedName() - + " Now waiting and blocking reads until observing a full flush cycle"); + + " for " + region.getRegionInfo().getEncodedName() + + "; now waiting and blocking reads until completes a full flush cycle"); } region.setReadsEnabled(true); break; @@ -151,10 +149,10 @@ public class RegionReplicaFlushHandler extends EventHandler { if (response.hasWroteFlushWalMarker()) { if(response.getWroteFlushWalMarker()) { if (LOG.isDebugEnabled()) { - LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary " + LOG.debug("Triggered empty flush marker (memstore empty) on primary " + "region replica " + ServerRegionReplicaUtil .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() - + " of region " + region.getRegionInfo().getEncodedName() + " Now waiting and " + + " for " + region.getRegionInfo().getEncodedName() + "; now waiting and " + "blocking reads until observing a flush marker"); } region.setReadsEnabled(true); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java index 8b275d0e6ed..0bf2543a445 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -84,19 +84,18 @@ public class UnassignRegionHandler extends EventHandler { // reportRegionStateTransition, so the HMaster will think the region is online, before we // actually open the region, as reportRegionStateTransition is part of the opening process. long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); - LOG.warn("Received CLOSE for the region: {}, which we are already " + - "trying to OPEN. try again after {}ms", encodedName, backoff); + LOG.warn("Received CLOSE for {} which we are already " + + "trying to OPEN; try again after {}ms", encodedName, backoff); rs.getExecutorService().delayedSubmit(this, backoff, TimeUnit.MILLISECONDS); } else { - LOG.info("Received CLOSE for the region: {}, which we are already trying to CLOSE," + + LOG.info("Received CLOSE for {} which we are already trying to CLOSE," + " but not completed yet", encodedName); } return; } HRegion region = rs.getRegion(encodedName); if (region == null) { - LOG.debug( - "Received CLOSE for a region {} which is not online, and we're not opening/closing.", + LOG.debug("Received CLOSE for {} which is not ONLINE and we're not opening/closing.", encodedName); rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE); return; @@ -114,10 +113,11 @@ public class UnassignRegionHandler extends EventHandler { if (region.close(abort) == null) { // XXX: Is this still possible? The old comment says about split, but now split is done at // master side, so... - LOG.warn("Can't close region {}, was already closed during close()", regionName); + LOG.warn("Can't close {} already closed during close()", regionName); rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE); return; } + rs.removeRegion(region, destination); if (!rs.reportRegionStateTransition( new RegionStateTransitionContext(TransitionCode.CLOSED, HConstants.NO_SEQNUM, closeProcId, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 6f537df9490..0967c101ce5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -412,14 +412,14 @@ public class ProtobufLogReader extends ReaderBase { + "because originalPosition is negative. last offset={}", this.inputStream.getPos(), eof); throw eof; } - // If stuck at the same place and we got and exception, lets go back at the beginning. + // If stuck at the same place and we got an exception, lets go back at the beginning. if (inputStream.getPos() == originalPosition) { if (resetPosition) { LOG.warn("Encountered a malformed edit, seeking to the beginning of the WAL since " + "current position and original position match at {}", originalPosition); seekOnFs(0); } else { - LOG.debug("Reached the end of file at position {}", originalPosition); + LOG.debug("EOF at position {}", originalPosition); } } else { // Else restore our position to original location in hope that next time through we will diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index c495376c87f..0b7ed16cc96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -237,21 +237,22 @@ public class ReplicationSource implements ReplicationSourceInterface { LOG.trace("NOT replicating {}", wal); return; } - String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName()); - PriorityBlockingQueue queue = queues.get(logPrefix); + // Use WAL prefix as the WALGroupId for this peer. + String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName()); + PriorityBlockingQueue queue = queues.get(walPrefix); if (queue == null) { queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator()); - queues.put(logPrefix, queue); + queues.put(walPrefix, queue); if (this.isSourceActive() && this.walEntryFilter != null) { // new wal group observed after source startup, start a new worker thread to track it // notice: it's possible that wal enqueued when this.running is set but worker thread // still not launched, so it's necessary to check workerThreads before start the worker - tryStartNewShipper(logPrefix, queue); + tryStartNewShipper(walPrefix, queue); } } queue.put(wal); if (LOG.isTraceEnabled()) { - LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), logPrefix, + LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix, this.replicationQueueInfo.getQueueId()); } this.metrics.incrSizeOfLogQueue(); @@ -260,7 +261,7 @@ public class ReplicationSource implements ReplicationSourceInterface { if (queueSize > this.logQueueWarnThreshold) { LOG.warn("{} WAL group {} queue size: {} exceeds value of " + "replication.source.log.queue.warn: {}", logPeerId(), - logPrefix, queueSize, logQueueWarnThreshold); + walPrefix, queueSize, logQueueWarnThreshold); } } @@ -357,14 +358,9 @@ public class ReplicationSource implements ReplicationSourceInterface { ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); if (extant != null) { - if(LOG.isDebugEnabled()) { - LOG.debug("{} Someone has beat us to start a worker thread for wal group {}", logPeerId(), - walGroupId); - } + LOG.debug("{} preempted start of worker walGroupId={}", logPeerId(), walGroupId); } else { - if(LOG.isDebugEnabled()) { - LOG.debug("{} Starting up worker for wal group {}", logPeerId(), walGroupId); - } + LOG.debug("{} starting worker for walGroupId={}", logPeerId(), walGroupId); ReplicationSourceWALReader walReader = createNewWALReader(walGroupId, queue, worker.getStartPosition()); Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() + @@ -435,8 +431,7 @@ public class ReplicationSource implements ReplicationSourceInterface { /** * Call after {@link #initializeWALEntryFilter(UUID)} else it will be null. - * @return The WAL Entry Filter Chain this ReplicationSource will use on WAL files filtering - * out WALEntry edits. + * @return WAL Entry Filter Chain to use on WAL files filtering *out* WALEntry edits. */ @VisibleForTesting WALEntryFilter getWalEntryFilter() { @@ -575,7 +570,7 @@ public class ReplicationSource implements ReplicationSourceInterface { if (!this.isSourceActive()) { return; } - LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};", + LOG.info("{} queueId={} is replicating from cluster={} to cluster={}", logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId); initializeWALEntryFilter(peerClusterId); @@ -589,7 +584,10 @@ public class ReplicationSource implements ReplicationSourceInterface { @Override public void startup() { - // mark we are running now + if (this.sourceRunning) { + return; + } + // Mark we are running now this.sourceRunning = true; initThread = new Thread(this::initialize); Threads.setDaemonThreadRunning(initThread, @@ -612,7 +610,8 @@ public class ReplicationSource implements ReplicationSourceInterface { terminate(reason, cause, clearMetrics, true); } - public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) { + public void terminate(String reason, Exception cause, boolean clearMetrics, + boolean join) { if (cause == null) { LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason); } else { @@ -798,7 +797,10 @@ public class ReplicationSource implements ReplicationSourceInterface { return queueStorage; } + /** + * @return String to use as a log prefix that contains current peerId. + */ private String logPeerId(){ - return "[Source for peer " + this.getPeerId() + "]:"; + return "peerId=" + this.getPeerId() + ","; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index d2cb6f33c15..7e67f539b68 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -174,7 +174,7 @@ class WALEntryStream implements Closeable { private void tryAdvanceEntry() throws IOException { if (checkReader()) { boolean beingWritten = readNextEntryAndRecordReaderPosition(); - LOG.trace("reading wal file {}. Current open for write: {}", this.currentPath, beingWritten); + LOG.trace("Reading WAL {}; currently open for write={}", this.currentPath, beingWritten); if (currentEntry == null && !beingWritten) { // no more entries in this log file, and the file is already closed, i.e, rolled // Before dequeueing, we should always get one more attempt at reading. @@ -222,7 +222,7 @@ class WALEntryStream implements Closeable { if (currentPositionOfReader < stat.getLen()) { final long skippedBytes = stat.getLen() - currentPositionOfReader; LOG.debug( - "Reached the end of WAL file '{}'. It was not closed cleanly," + + "Reached the end of WAL {}. It was not closed cleanly," + " so we did not parse {} bytes of data. This is normally ok.", currentPath, skippedBytes); metrics.incrUncleanlyClosedWALs(); @@ -230,7 +230,7 @@ class WALEntryStream implements Closeable { } } else if (currentPositionOfReader + trailerSize < stat.getLen()) { LOG.warn( - "Processing end of WAL file '{}'. At position {}, which is too far away from" + + "Processing end of WAL {} at position {}, which is too far away from" + " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}", currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat()); setPosition(0); @@ -241,7 +241,7 @@ class WALEntryStream implements Closeable { } } if (LOG.isTraceEnabled()) { - LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " + + LOG.trace("Reached the end of " + this.currentPath + " and length of the file is " + (stat == null ? "N/A" : stat.getLen())); } metrics.incrCompletedWAL(); @@ -249,7 +249,7 @@ class WALEntryStream implements Closeable { } private void dequeueCurrentLog() throws IOException { - LOG.debug("Reached the end of log {}", currentPath); + LOG.debug("EOF, closing {}", currentPath); closeReader(); logQueue.remove(); setPosition(0); @@ -264,7 +264,7 @@ class WALEntryStream implements Closeable { long readerPos = reader.getPosition(); OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { - // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted + // See HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted // data, so we need to make sure that we do not read beyond the committed file length. if (LOG.isDebugEnabled()) { LOG.debug("The provider tells us the valid length for " + currentPath + " is " + diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java index bb02af3788a..557ba77c523 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java @@ -221,7 +221,7 @@ public final class MetaTableLocator { LOG.warn("Tried to set null ServerName in hbase:meta; skipping -- ServerName required"); return; } - LOG.info("Setting hbase:meta (replicaId={}) location in ZooKeeper as {}, state={}", replicaId, + LOG.info("Setting hbase:meta replicaId={} location in ZooKeeper as {}, state={}", replicaId, serverName, state); // Make the MetaRegionServer pb and then get its bytes and save this as // the znode content. @@ -235,9 +235,9 @@ public final class MetaTableLocator { zookeeper.getZNodePaths().getZNodeForReplica(replicaId), data); } catch(KeeperException.NoNodeException nne) { if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) { - LOG.debug("META region location doesn't exist, create it"); + LOG.debug("hbase:meta region location doesn't exist, create it"); } else { - LOG.debug("META region location doesn't exist for replicaId=" + replicaId + + LOG.debug("hbase:meta region location doesn't exist for replicaId=" + replicaId + ", create it"); } ZKUtil.createAndWatch(zookeeper, zookeeper.getZNodePaths().getZNodeForReplica(replicaId),