HBASE-6804 [replication] lower the amount of logging to a more human-readable level

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1416735 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2012-12-04 00:09:49 +00:00
parent abd334265e
commit 211e4e019b
8 changed files with 115 additions and 13 deletions

View File

@ -34,6 +34,7 @@ public class MetricsSink {
public static final String SINK_APPLIED_OPS = "sink.appliedOps"; public static final String SINK_APPLIED_OPS = "sink.appliedOps";
private MetricsReplicationSource rms; private MetricsReplicationSource rms;
private long lastTimestampForAge = System.currentTimeMillis();
public MetricsSink() { public MetricsSink() {
rms = CompatibilitySingletonFactory.getInstance(MetricsReplicationSource.class); rms = CompatibilitySingletonFactory.getInstance(MetricsReplicationSource.class);
@ -43,10 +44,22 @@ public class MetricsSink {
* Set the age of the last applied operation * Set the age of the last applied operation
* *
* @param timestamp The timestamp of the last operation applied. * @param timestamp The timestamp of the last operation applied.
* @return the age that was set
*/ */
public void setAgeOfLastAppliedOp(long timestamp) { public long setAgeOfLastAppliedOp(long timestamp) {
long age = System.currentTimeMillis() - timestamp; lastTimestampForAge = timestamp;
long age = System.currentTimeMillis() - lastTimestampForAge;
rms.setGauge(SINK_AGE_OF_LAST_APPLIED_OP, age); rms.setGauge(SINK_AGE_OF_LAST_APPLIED_OP, age);
return age;
}
/**
* Refreshing the age makes sure the value returned is the actual one and
* not the one set a replication time
* @return refreshed age
*/
public long refreshAgeOfLastAppliedOp() {
return setAgeOfLastAppliedOp(lastTimestampForAge);
} }
/** /**

View File

@ -21,8 +21,14 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -52,6 +58,8 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
@InterfaceAudience.Private @InterfaceAudience.Private
public class Replication implements WALActionsListener, public class Replication implements WALActionsListener,
ReplicationSourceService, ReplicationSinkService { ReplicationSourceService, ReplicationSinkService {
private static final Log LOG =
LogFactory.getLog(Replication.class);
private boolean replication; private boolean replication;
private ReplicationSourceManager replicationManager; private ReplicationSourceManager replicationManager;
private final AtomicBoolean replicating = new AtomicBoolean(true); private final AtomicBoolean replicating = new AtomicBoolean(true);
@ -60,6 +68,9 @@ public class Replication implements WALActionsListener,
private ReplicationSink replicationSink; private ReplicationSink replicationSink;
// Hosting server // Hosting server
private Server server; private Server server;
/** Statistics thread schedule pool */
private ScheduledExecutorService scheduleThreadPool;
private int statsThreadPeriod;
/** /**
* Instantiate the replication management (if rep is enabled). * Instantiate the replication management (if rep is enabled).
@ -85,6 +96,11 @@ public class Replication implements WALActionsListener,
this.server = server; this.server = server;
this.conf = this.server.getConfiguration(); this.conf = this.server.getConfiguration();
this.replication = isReplication(this.conf); this.replication = isReplication(this.conf);
this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat(server.getServerName() + "Replication Statistics #%d")
.setDaemon(true)
.build());
if (replication) { if (replication) {
try { try {
this.zkHelper = new ReplicationZookeeper(server, this.replicating); this.zkHelper = new ReplicationZookeeper(server, this.replicating);
@ -94,6 +110,9 @@ public class Replication implements WALActionsListener,
} }
this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.server, fs, this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.server, fs,
this.replicating, logDir, oldLogDir); this.replicating, logDir, oldLogDir);
this.statsThreadPeriod =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
} else { } else {
this.replicationManager = null; this.replicationManager = null;
this.zkHelper = null; this.zkHelper = null;
@ -151,6 +170,9 @@ public class Replication implements WALActionsListener,
if (this.replication) { if (this.replication) {
this.replicationManager.init(); this.replicationManager.init();
this.replicationSink = new ReplicationSink(this.conf, this.server); this.replicationSink = new ReplicationSink(this.conf, this.server);
this.scheduleThreadPool.scheduleAtFixedRate(
new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
} }
} }
@ -232,4 +254,32 @@ public class Replication implements WALActionsListener,
public void logCloseRequested() { public void logCloseRequested() {
// not interested // not interested
} }
/*
* Statistics thread. Periodically prints the cache statistics to the log.
*/
static class ReplicationStatisticsThread extends Thread {
private final ReplicationSink replicationSink;
private final ReplicationSourceManager replicationManager;
public ReplicationStatisticsThread(final ReplicationSink replicationSink,
final ReplicationSourceManager replicationManager) {
super("ReplicationStatisticsThread");
this.replicationManager = replicationManager;
this.replicationSink = replicationSink;
}
@Override
public void run() {
printStats(this.replicationManager.getStats());
printStats(this.replicationSink.getStats());
}
private void printStats(String stats) {
if (!stats.isEmpty()) {
LOG.info(stats);
}
}
}
} }

View File

@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -73,6 +74,7 @@ public class ReplicationSink {
private final ExecutorService sharedThreadPool; private final ExecutorService sharedThreadPool;
private final HConnection sharedHtableCon; private final HConnection sharedHtableCon;
private final MetricsSink metrics; private final MetricsSink metrics;
private final AtomicLong totalReplicatedEdits = new AtomicLong();
/** /**
* Create a sink for replication * Create a sink for replication
@ -158,7 +160,7 @@ public class ReplicationSink {
this.metrics.setAgeOfLastAppliedOp( this.metrics.setAgeOfLastAppliedOp(
entries[entries.length-1].getKey().getWriteTime()); entries[entries.length-1].getKey().getWriteTime());
this.metrics.applyBatch(entries.length); this.metrics.applyBatch(entries.length);
LOG.info("Total replicated: " + totalReplicated); this.totalReplicatedEdits.addAndGet(totalReplicated);
} catch (IOException ex) { } catch (IOException ex) {
LOG.error("Unable to accept edit because:", ex); LOG.error("Unable to accept edit because:", ex);
throw ex; throw ex;
@ -226,4 +228,15 @@ public class ReplicationSink {
} }
} }
} }
/**
* Get a string representation of this sink's metrics
* @return string with the total replicated edits count and the date
* of the last edit that was applied
*/
public String getStats() {
return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: " +
"age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() +
", total replicated edits: " + this.totalReplicatedEdits;
}
} }

View File

@ -454,9 +454,6 @@ public class ReplicationSource extends Thread
break; break;
} }
} }
LOG.debug("currentNbOperations:" + currentNbOperations +
" and seenEntries:" + seenEntries +
" and size: " + (this.reader.getPosition() - startPosition));
if (currentWALisBeingWrittenTo) { if (currentWALisBeingWrittenTo) {
return false; return false;
} }
@ -512,8 +509,6 @@ public class ReplicationSource extends Thread
*/ */
protected boolean openReader(int sleepMultiplier) { protected boolean openReader(int sleepMultiplier) {
try { try {
LOG.debug("Opening log for replication " + this.currentPath.getName() +
" at " + this.position);
try { try {
this.reader = null; this.reader = null;
this.reader = HLogFactory.createReader(this.fs, this.reader = HLogFactory.createReader(this.fs,
@ -651,7 +646,6 @@ public class ReplicationSource extends Thread
} }
try { try {
AdminProtocol rrs = getRS(); AdminProtocol rrs = getRS();
LOG.debug("Replicating " + currentNbEntries);
ProtobufUtil.replicateWALEntry(rrs, ProtobufUtil.replicateWALEntry(rrs,
Arrays.copyOf(this.entriesArray, currentNbEntries)); Arrays.copyOf(this.entriesArray, currentNbEntries));
if (this.lastLoggedPosition != this.position) { if (this.lastLoggedPosition != this.position) {
@ -663,7 +657,6 @@ public class ReplicationSource extends Thread
this.metrics.shipBatch(this.currentNbOperations); this.metrics.shipBatch(this.currentNbOperations);
this.metrics.setAgeOfLastShippedOp( this.metrics.setAgeOfLastShippedOp(
this.entriesArray[currentNbEntries-1].getKey().getWriteTime()); this.entriesArray[currentNbEntries-1].getKey().getWriteTime());
LOG.debug("Replicated in total: " + this.totalReplicatedEdits);
break; break;
} catch (IOException ioe) { } catch (IOException ioe) {
@ -846,4 +839,11 @@ public class ReplicationSource extends Thread
return Long.parseLong(parts[parts.length-1]); return Long.parseLong(parts[parts.length-1]);
} }
} }
@Override
public String getStats() {
return "Total replicated edits: " + totalReplicatedEdits +
", currently replicating from: " + this.currentPath +
" at position: " + this.position;
}
} }

View File

@ -94,4 +94,11 @@ public interface ReplicationSourceInterface {
*/ */
public String getPeerClusterId(); public String getPeerClusterId();
/**
* Get a string representation of the current statistics
* for this source
* @return printable stats
*/
public String getStats();
} }

View File

@ -151,7 +151,6 @@ public class ReplicationSourceManager {
public void logPositionAndCleanOldLogs(Path log, String id, long position, public void logPositionAndCleanOldLogs(Path log, String id, long position,
boolean queueRecovered, boolean holdLogInZK) { boolean queueRecovered, boolean holdLogInZK) {
String key = log.getName(); String key = log.getName();
LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
this.zkHelper.writeReplicationStatus(key, id, position); this.zkHelper.writeReplicationStatus(key, id, position);
if (holdLogInZK) { if (holdLogInZK) {
return; return;
@ -160,8 +159,6 @@ public class ReplicationSourceManager {
SortedSet<String> hlogs = this.hlogsById.get(id); SortedSet<String> hlogs = this.hlogsById.get(id);
if (!queueRecovered && hlogs.first() != key) { if (!queueRecovered && hlogs.first() != key) {
SortedSet<String> hlogSet = hlogs.headSet(key); SortedSet<String> hlogSet = hlogs.headSet(key);
LOG.info("Removing " + hlogSet.size() +
" logs in the list: " + hlogSet);
for (String hlog : hlogSet) { for (String hlog : hlogSet) {
this.zkHelper.removeLogFromList(hlog, id); this.zkHelper.removeLogFromList(hlog, id);
} }
@ -638,4 +635,20 @@ public class ReplicationSourceManager {
public FileSystem getFs() { public FileSystem getFs() {
return this.fs; return this.fs;
} }
/**
* Get a string representation of all the sources' metrics
*/
public String getStats() {
StringBuffer stats = new StringBuffer();
for (ReplicationSourceInterface source : sources) {
stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
stats.append(source.getStats() + "\n");
}
for (ReplicationSourceInterface oldSource : oldsources) {
stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId() + ": ");
stats.append(oldSource.getStats()+ "\n");
}
return stats.toString();
}
} }

View File

@ -80,4 +80,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
public String getPeerClusterId() { public String getPeerClusterId() {
return peerClusterId; return peerClusterId;
} }
@Override
public String getStats() {
return "";
}
} }

View File

@ -104,6 +104,7 @@ public class TestReplication {
conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
conf1.setBoolean("dfs.support.append", true); conf1.setBoolean("dfs.support.append", true);
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf1.setInt("replication.stats.thread.period.seconds", 5);
utility1 = new HBaseTestingUtility(conf1); utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster(); utility1.startMiniZKCluster();