diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 81ed4b83cd3..6d8cfd920d1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -198,11 +198,14 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi protected boolean sleepForRetries(String msg, int sleepMultiplier) { try { if (LOG.isTraceEnabled()) { - LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier); + LOG.trace("{} {}, sleeping {} times {}", + logPeerId(), msg, sleepForRetries, sleepMultiplier); } Thread.sleep(this.sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { - LOG.debug("Interrupted while sleeping between retries"); + if (LOG.isDebugEnabled()) { + LOG.debug("{} Interrupted while sleeping between retries", logPeerId()); + } } return sleepMultiplier < maxRetriesMultiplier; } @@ -288,7 +291,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi connection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent()); } catch (IOException ioe) { - LOG.warn("Failed to create connection for peer cluster", ioe); + LOG.warn("{} Failed to create connection for peer cluster", logPeerId(), ioe); } if (connection != null) { this.conn = connection; @@ -301,8 +304,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi for (int i = 0; i < batches.size(); i++) { List entries = batches.get(i); if (!entries.isEmpty()) { - LOG.trace("Submitting {} entries of total size {}", entries.size(), - replicateContext.getSize()); + if (LOG.isTraceEnabled()) { + LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(), + replicateContext.getSize()); + } // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource pool.submit(createReplicator(entries, i)); futures++; @@ -353,8 +358,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi int numSinks = replicationSinkMgr.getNumSinks(); if (numSinks == 0) { - LOG.warn("No replication sinks found, returning without replicating. The source should " + - "retry with the same set of edits."); + LOG.warn("{} No replication sinks found, returning without replicating. " + + "The source should retry with the same set of edits.", logPeerId()); return false; } @@ -376,7 +381,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi } catch (IOException ioe) { if (ioe instanceof RemoteException) { ioe = ((RemoteException) ioe).unwrapRemoteException(); - LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); + LOG.warn("{} Can't replicate because of an error on the remote cluster: ", logPeerId(), + ioe); if (ioe instanceof TableNotFoundException) { if (dropOnDeletedTables) { // this is a bit fragile, but cannot change how TNFE is serialized @@ -389,19 +395,20 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // Would potentially be better to retry in one of the outer loops // and add a table filter there; but that would break the encapsulation, // so we're doing the filtering here. - LOG.info("Missing table detected at sink, local table also does not exist, " + - "filtering edits for '" + table + "'"); + LOG.info("{} Missing table detected at sink, local table also does not " + + "exist, filtering edits for '{}'", logPeerId(), table); batches = filterBatches(batches, table); continue; } } catch (IOException iox) { - LOG.warn("Exception checking for local table: ", iox); + LOG.warn("{} Exception checking for local table: ", logPeerId(), iox); } } } // fall through and sleep below } else { - LOG.warn("Peer encountered RemoteException, rechecking all sinks: ", ioe); + LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(), + ioe); replicationSinkMgr.chooseSinks(); } } else { @@ -414,10 +421,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi "caused by a machine failure or a massive slowdown", this.socketTimeoutMultiplier); } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) { - LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe); + LOG.warn("{} Peer is unavailable, rechecking all sinks: ", logPeerId(), ioe); replicationSinkMgr.chooseSinks(); } else { - LOG.warn("Can't replicate because of a local or network error: ", ioe); + LOG.warn("{} Can't replicate because of a local or network error: ", logPeerId(), ioe); } } if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { @@ -440,7 +447,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi this.conn.close(); this.conn = null; } catch (IOException e) { - LOG.warn("Failed to close the connection"); + LOG.warn("{} Failed to close the connection", logPeerId()); } } // Allow currently running replication tasks to finish @@ -466,8 +473,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi int entriesHashCode = System.identityHashCode(entries); if (LOG.isTraceEnabled()) { long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum(); - LOG.trace("Replicating batch {} of {} entries with total size {} bytes to {}", - entriesHashCode, entries.size(), size, replicationClusterId); + LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", + logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId); } sinkPeer = replicationSinkMgr.getReplicationSink(); AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer(); @@ -475,9 +482,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi ReplicationProtbufUtil.replicateWALEntry(rsAdmin, entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir, hfileArchiveDir); - LOG.trace("Completed replicating batch {}", entriesHashCode); + if (LOG.isTraceEnabled()) { + LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode); + } } catch (IOException e) { - LOG.trace("Failed replicating batch {}", entriesHashCode, e); + if (LOG.isTraceEnabled()) { + LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e); + } throw e; } replicationSinkMgr.reportSinkSuccess(sinkPeer); @@ -515,4 +526,9 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex) : () -> replicateEntries(entries, batchIndex); } + + private String logPeerId(){ + return "[Source for peer " + this.ctx.getPeerId() + "]:"; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index b58fce300ca..7e36d625c02 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -209,14 +209,17 @@ public class ReplicationSource implements ReplicationSourceInterface { } else { queue.put(log); } - LOG.trace("Added log file {} to queue of source {}.", logPrefix, + if (LOG.isTraceEnabled()) { + LOG.trace("{} Added log file {} to queue of source {}.", logPeerId(), logPrefix, this.replicationQueueInfo.getQueueId()); + } this.metrics.incrSizeOfLogQueue(); // This will log a warning for each new log that gets created above the warn threshold int queueSize = queue.size(); if (queueSize > this.logQueueWarnThreshold) { - LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize - + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold); + LOG.warn("{} WAL group {} queue size: {} exceeds value of " + + "replication.source.log.queue.warn: {}", logPeerId(), + logPrefix, queueSize, logQueueWarnThreshold); } } @@ -232,8 +235,8 @@ public class ReplicationSource implements ReplicationSourceInterface { this.queueStorage.addHFileRefs(peerId, pairs); metrics.incrSizeOfHFileRefsQueue(pairs.size()); } else { - LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family " - + Bytes.toString(family) + " to peer id " + peerId); + LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", + tableName, Bytes.toString(family), peerId); } } else { // user has explicitly not defined any table cfs for replication, means replicate all the @@ -305,9 +308,14 @@ public class ReplicationSource implements ReplicationSourceInterface { ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); if (extant != null) { - LOG.debug("Someone has beat us to start a worker thread for wal group {}", walGroupId); + if(LOG.isDebugEnabled()) { + LOG.debug("{} Someone has beat us to start a worker thread for wal group {}", logPeerId(), + walGroupId); + } } else { - LOG.debug("Starting up worker for wal group {}", walGroupId); + if(LOG.isDebugEnabled()) { + LOG.debug("{} Starting up worker for wal group {}", logPeerId(), walGroupId); + } ReplicationSourceWALReader walReader = createNewWALReader(walGroupId, queue, worker.getStartPosition()); Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() + @@ -337,7 +345,7 @@ public class ReplicationSource implements ReplicationSourceInterface { } } else { currentPath = new Path("NO_LOGS_IN_QUEUE"); - LOG.warn("No replication ongoing, waiting for new log"); + LOG.warn("{} No replication ongoing, waiting for new log", logPeerId()); } ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder(); statusBuilder.withPeerId(this.getPeerId()) @@ -378,7 +386,8 @@ public class ReplicationSource implements ReplicationSourceInterface { protected final void uncaughtException(Thread t, Throwable e) { RSRpcServices.exitIfOOME(e); - LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e); + LOG.error("Unexpected exception in {} currentPath={}", + t.getName(), getCurrentPath(), e); server.abort("Unexpected exception in " + t.getName(), e); } @@ -399,7 +408,7 @@ public class ReplicationSource implements ReplicationSourceInterface { long sleepTicks = throttler.getNextSleepInterval(batchSize); if (sleepTicks > 0) { if (LOG.isTraceEnabled()) { - LOG.trace("To sleep " + sleepTicks + "ms for throttling control"); + LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), sleepTicks); } Thread.sleep(sleepTicks); // reset throttler's cycle start tick when sleep for throttling occurs @@ -433,11 +442,14 @@ public class ReplicationSource implements ReplicationSourceInterface { protected boolean sleepForRetries(String msg, int sleepMultiplier) { try { if (LOG.isTraceEnabled()) { - LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier); + LOG.trace("{} {}, sleeping {} times {}", + logPeerId(), msg, sleepForRetries, sleepMultiplier); } Thread.sleep(this.sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { - LOG.debug("Interrupted while sleeping between retries"); + if(LOG.isDebugEnabled()) { + LOG.debug("{} Interrupted while sleeping between retries", logPeerId()); + } Thread.currentThread().interrupt(); } return sleepMultiplier < maxRetriesMultiplier; @@ -450,7 +462,7 @@ public class ReplicationSource implements ReplicationSourceInterface { try { replicationEndpoint = createReplicationEndpoint(); } catch (Exception e) { - LOG.warn("error creating ReplicationEndpoint, retry", e); + LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e); if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) { sleepMultiplier++; } @@ -462,7 +474,7 @@ public class ReplicationSource implements ReplicationSourceInterface { this.replicationEndpoint = replicationEndpoint; break; } catch (Exception e) { - LOG.warn("Error starting ReplicationEndpoint, retry", e); + LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e); replicationEndpoint.stop(); if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) { sleepMultiplier++; @@ -480,8 +492,10 @@ public class ReplicationSource implements ReplicationSourceInterface { for (;;) { peerClusterId = replicationEndpoint.getPeerUUID(); if (this.isSourceActive() && peerClusterId == null) { - LOG.debug("Could not connect to Peer ZK. Sleeping for " - + (this.sleepForRetries * sleepMultiplier) + " millis."); + if(LOG.isDebugEnabled()) { + LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(), + (this.sleepForRetries * sleepMultiplier)); + } if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { sleepMultiplier++; } @@ -499,8 +513,8 @@ public class ReplicationSource implements ReplicationSourceInterface { this.manager.removeSource(this); return; } - LOG.info("Source: {}, is now replicating from cluster: {}; to peer cluster: {};", - this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId); + LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};", + logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId); initializeWALEntryFilter(peerClusterId); // start workers @@ -533,10 +547,10 @@ public class ReplicationSource implements ReplicationSourceInterface { public void terminate(String reason, Exception cause, boolean join) { if (cause == null) { - LOG.info("Closing source " + this.queueId + " because: " + reason); + LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason); } else { - LOG.error("Closing source " + this.queueId + " because an error occurred: " + reason, - cause); + LOG.error("{} Closing source {} because an error occurred: {}", + logPeerId(), this.queueId, reason, cause); } this.sourceRunning = false; if (initThread != null && Thread.currentThread() != initThread) { @@ -560,7 +574,7 @@ public class ReplicationSource implements ReplicationSourceInterface { // Wait worker to stop Thread.sleep(this.sleepForRetries); } catch (InterruptedException e) { - LOG.info("Interrupted while waiting " + worker.getName() + " to stop"); + LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName()); Thread.currentThread().interrupt(); } // If worker still is alive after waiting, interrupt it @@ -581,15 +595,15 @@ public class ReplicationSource implements ReplicationSourceInterface { if (join) { for (ReplicationSourceShipper worker : workers) { Threads.shutdown(worker, this.sleepForRetries); - LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated"); + LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName()); } if (this.replicationEndpoint != null) { try { this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS); } catch (TimeoutException te) { - LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" + - this.queueId, te); + LOG.warn("{} Got exception while waiting for endpoint to shutdown " + + "for replication source : {}", logPeerId(), this.queueId, te); } } } @@ -721,4 +735,8 @@ public class ReplicationSource implements ReplicationSourceInterface { void removeWorker(ReplicationSourceShipper worker) { workerThreads.remove(worker.walGroupId, worker); } + + private String logPeerId(){ + return "[Source for peer " + this.getPeer().getId() + "]:"; + } }