HBASE-15144 Procedure v2 - Web UI displaying Store state

This commit is contained in:
Samir Ahmic 2016-02-24 16:05:24 +01:00 committed by Matteo Bertozzi
parent 77133fd225
commit 40c55915e7
6 changed files with 244 additions and 39 deletions

View File

@ -22,12 +22,12 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
@ -42,24 +42,29 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
private ProcedureWALHeader header;
private FSDataInputStream stream;
private FileStatus logStatus;
private FileSystem fs;
private Path logFile;
private long startPos;
private long minProcId;
private long maxProcId;
private long logSize;
private long timestamp;
public ProcedureWALFile(final FileSystem fs, final FileStatus logStatus) {
this.fs = fs;
this.logStatus = logStatus;
this.logFile = logStatus.getPath();
this.logSize = logStatus.getLen();
this.timestamp = logStatus.getModificationTime();
}
public ProcedureWALFile(FileSystem fs, Path logFile, ProcedureWALHeader header, long startPos) {
public ProcedureWALFile(FileSystem fs, Path logFile, ProcedureWALHeader header,
long startPos, long timestamp) {
this.fs = fs;
this.logFile = logFile;
this.header = header;
this.logFile = logFile;
this.startPos = startPos;
this.logSize = startPos;
this.timestamp = timestamp;
}
public void open() throws IOException {
@ -77,7 +82,7 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
public ProcedureWALTrailer readTrailer() throws IOException {
try {
return ProcedureWALFormat.readTrailer(stream, startPos, logStatus.getLen());
return ProcedureWALFormat.readTrailer(stream, startPos, logSize);
} finally {
stream.seek(startPos);
}
@ -112,6 +117,10 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
return header;
}
public long getTimestamp() {
return timestamp;
}
public boolean isCompacted() {
return header.getType() == ProcedureWALFormat.LOG_TYPE_COMPACTED;
}
@ -121,7 +130,14 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
}
public long getSize() {
return logStatus != null ? logStatus.getLen() : 0;
return logSize;
}
/**
* Used to update in-progress log sizes. the FileStatus will report 0 otherwise.
*/
void addToSize(long size) {
this.logSize += size;
}
public void removeFile() throws IOException {

View File

@ -23,14 +23,14 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
@ -113,7 +113,7 @@ public final class ProcedureWALFormat {
* | offset |-----+
* +-----------------+
*/
public static void writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker)
public static long writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker)
throws IOException {
long offset = stream.getPos();
@ -128,6 +128,7 @@ public final class ProcedureWALFormat {
stream.write(TRAILER_VERSION);
StreamUtils.writeLong(stream, TRAILER_MAGIC);
StreamUtils.writeLong(stream, offset);
return stream.getPos() - offset;
}
public static ProcedureWALHeader readHeader(InputStream stream)

View File

@ -23,15 +23,17 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* Helper class that loads the procedures stored in a WAL
*/
@ -142,7 +144,7 @@ public class ProcedureWALFormatReader {
throw new CorruptedWALProcedureStoreException("Invalid entry: " + entry);
}
}
} catch (IOException e) {
} catch (InvalidProtocolBufferException e) {
LOG.error("got an exception while reading the procedure WAL: " + log, e);
loader.markCorruptedWAL(log, e);
}

View File

@ -18,23 +18,24 @@
package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.Arrays;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -98,6 +99,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
private static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M
private static final String STORE_WAL_SYNC_STATS_COUNT =
"hbase.procedure.store.wal.sync.stats.count";
private static final int DEFAULT_SYNC_STATS_COUNT = 10;
private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>();
private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
private final ReentrantLock lock = new ReentrantLock();
@ -133,6 +138,37 @@ public class WALProcedureStore extends ProcedureStoreBase {
private boolean useHsync;
private int syncWaitMsec;
// Variables used for UI display
private CircularFifoBuffer syncMetricsBuffer;
public static class SyncMetrics {
private long timestamp;
private long syncWaitMs;
private long totalSyncedBytes;
private int syncedEntries;
private float syncedPerSec;
public long getTimestamp() {
return timestamp;
}
public long getSyncWaitMs() {
return syncWaitMs;
}
public long getTotalSyncedBytes() {
return totalSyncedBytes;
}
public long getSyncedEntries() {
return syncedEntries;
}
public float getSyncedPerSec() {
return syncedPerSec;
}
}
public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir,
final LeaseRecovery leaseRecovery) {
this.fs = fs;
@ -166,6 +202,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
// WebUI
syncMetricsBuffer = new CircularFifoBuffer(
conf.getInt(STORE_WAL_SYNC_STATS_COUNT, DEFAULT_SYNC_STATS_COUNT));
// Init sync thread
syncThread = new Thread("WALProcedureStoreSyncThread") {
@Override
@ -509,6 +549,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
private void syncLoop() throws Throwable {
long totalSyncedToStore = 0;
inSync.set(false);
lock.lock();
try {
@ -533,23 +574,37 @@ public class WALProcedureStore extends ProcedureStoreBase {
continue;
}
}
// Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
long syncWaitSt = System.currentTimeMillis();
final long syncWaitSt = System.currentTimeMillis();
if (slotIndex != slots.length) {
slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
}
long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
final long currentTs = System.currentTimeMillis();
final long syncWaitMs = currentTs - syncWaitSt;
final float rollSec = getMillisFromLastRoll() / 1000.0f;
final float syncedPerSec = totalSyncedToStore / rollSec;
if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
float rollSec = getMillisFromLastRoll() / 1000.0f;
LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
StringUtils.humanSize(totalSynced.get()),
StringUtils.humanSize(totalSynced.get() / rollSec)));
StringUtils.humanSize(totalSyncedToStore),
StringUtils.humanSize(syncedPerSec)));
}
// update webui circular buffers (TODO: get rid of allocations)
final SyncMetrics syncMetrics = new SyncMetrics();
syncMetrics.timestamp = currentTs;
syncMetrics.syncWaitMs = syncWaitMs;
syncMetrics.syncedEntries = slotIndex;
syncMetrics.totalSyncedBytes = totalSyncedToStore;
syncMetrics.syncedPerSec = syncedPerSec;
syncMetricsBuffer.add(syncMetrics);
// sync
inSync.set(true);
totalSynced.addAndGet(syncSlots());
long slotSize = syncSlots();
logs.getLast().addToSize(slotSize);
totalSyncedToStore = totalSynced.addAndGet(slotSize);
slotIndex = 0;
inSync.set(false);
} catch (InterruptedException e) {
@ -569,6 +624,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
}
public ArrayList<SyncMetrics> getSyncMetrics() {
lock.lock();
try {
return new ArrayList<SyncMetrics>(syncMetricsBuffer);
} finally {
lock.unlock();
}
}
private long syncSlots() throws Throwable {
int retry = 0;
int logRolled = 0;
@ -647,14 +711,14 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
}
private long getMillisToNextPeriodicRoll() {
public long getMillisToNextPeriodicRoll() {
if (lastRollTs.get() > 0 && periodicRollMsec > 0) {
return periodicRollMsec - getMillisFromLastRoll();
}
return Long.MAX_VALUE;
}
private long getMillisFromLastRoll() {
public long getMillisFromLastRoll() {
return (System.currentTimeMillis() - lastRollTs.get());
}
@ -761,8 +825,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
stream = newStream;
flushLogId = logId;
totalSynced.set(0);
lastRollTs.set(System.currentTimeMillis());
logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
long rollTs = System.currentTimeMillis();
lastRollTs.set(rollTs);
logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos, rollTs));
if (LOG.isDebugEnabled()) {
LOG.debug("Roll new state log: " + logId);
@ -776,7 +841,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
try {
ProcedureWALFile log = logs.getLast();
log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
ProcedureWALFormat.writeTrailer(stream, storeTracker);
long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
log.addToSize(trailerSize);
} catch (IOException e) {
LOG.warn("Unable to write the trailer: " + e.getMessage());
}

View File

@ -336,7 +336,7 @@ public class HMaster extends HRegionServer implements MasterServices {
// handle table states
private TableStateManager tableStateManager;
private long splitPlanCount;
private long mergePlanCount;
@ -2159,6 +2159,10 @@ public class HMaster extends HRegionServer implements MasterServices {
return procedureStore != null ? procedureStore.getActiveLogs().size() : 0;
}
public WALProcedureStore getWalProcedureStore() {
return procedureStore;
}
public int getRegionServerInfoPort(final ServerName sn) {
RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
if (info == null || info.getInfoPort() == 0) {
@ -2358,7 +2362,7 @@ public class HMaster extends HRegionServer implements MasterServices {
}
return regionStates.getAverageLoad();
}
/*
* @return the count of region split plans executed
*/

View File

@ -21,18 +21,31 @@
import="static org.apache.commons.lang.StringEscapeUtils.escapeXml"
import="java.util.Collections"
import="java.util.Comparator"
import="java.util.ArrayList"
import="java.util.Date"
import="java.util.List"
import="java.util.Set"
import="org.apache.hadoop.conf.Configuration"
import="org.apache.hadoop.hbase.HBaseConfiguration"
import="org.apache.hadoop.hbase.ProcedureInfo"
import="org.apache.hadoop.hbase.master.HMaster"
import="org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv"
import="org.apache.hadoop.hbase.procedure2.ProcedureExecutor"
import="org.apache.hadoop.hbase.procedure2.store.wal.ProcedureWALFile"
import="org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore"
import="org.apache.hadoop.hbase.procedure2.util.StringUtils"
%>
<%
HMaster master = (HMaster)getServletContext().getAttribute(HMaster.MASTER);
ProcedureExecutor<MasterProcedureEnv> procExecutor = master.getMasterProcedureExecutor();
WALProcedureStore walStore = master.getWalProcedureStore();
ArrayList<WALProcedureStore.SyncMetrics> syncMetricsBuff = walStore.getSyncMetrics();
long millisToNextRoll = walStore.getMillisToNextPeriodicRoll();
long millisFromLastRoll = walStore.getMillisFromLastRoll();
ArrayList<ProcedureWALFile> procedureWALFiles = walStore.getActiveLogs();
Set<ProcedureWALFile> corruptedWALFiles = walStore.getCorruptedLogs();
List<ProcedureInfo> procedures = procExecutor.listProcedures();
Collections.sort(procedures, new Comparator<ProcedureInfo>() {
@Override
@ -118,7 +131,110 @@
<% } %>
</table>
</div>
<br>
<div class="container-fluid content">
<div class="row">
<div class="page-header">
<h2>Procedure WAL State</h2>
</div>
</div>
<div class="tabbable">
<ul class="nav nav-pills">
<li class="active">
<a href="#tab_WALFiles" data-toggle="tab">WAL files</a>
</li>
<li class="">
<a href="#tab_WALFilesCorrupted" data-toggle="tab">Corrupted WAL files</a>
</li>
<li class="">
<a href="#tab_WALRollTime" data-toggle="tab">WAL roll time</a>
</li>
<li class="">
<a href="#tab_SyncStats" data-toggle="tab">Sync stats</a>
</li>
</ul>
<div class="tab-content" style="padding-bottom: 9px; border-bottom: 1px solid #ddd;">
<div class="tab-pane active" id="tab_WALFiles">
<% if (procedureWALFiles != null && procedureWALFiles.size() > 0) { %>
<table class="table table-striped">
<tr>
<th>LogID</th>
<th>Size</th>
<th>Timestamp</th>
<th>Path</th>
</tr>
<% for (int i = procedureWALFiles.size() - 1; i >= 0; --i) { %>
<% ProcedureWALFile pwf = procedureWALFiles.get(i); %>
<tr>
<td> <%= pwf.getLogId() %></td>
<td> <%= StringUtils.humanSize(pwf.getSize()) %> </td>
<td> <%= new Date(pwf.getTimestamp()) %></a></td>
<td> <%= escapeXml(pwf.toString()) %></t>
</tr>
<% } %>
</table>
<% } else {%>
<p> No WAL files</p>
<% } %>
</div>
<div class="tab-pane" id="tab_WALFilesCorrupted">
<% if (corruptedWALFiles != null && corruptedWALFiles.size() > 0) { %>
<table class="table table-striped">
<tr>
<th>LogID</th>
<th>Size</th>
<th>Timestamp</th>
<th>Path</th>
</tr>
<% for (ProcedureWALFile cwf:corruptedWALFiles) { %>
<tr>
<td> <%= cwf.getLogId() %></td>
<td> <%= StringUtils.humanSize(cwf.getSize()) %> </td>
<td> <%= new Date(cwf.getTimestamp()) %></a></td>
<td> <%= escapeXml(cwf.toString()) %></t>
</tr>
<% } %>
</table>
<% } else {%>
<p> No corrupted WAL files</p>
<% } %>
</div>
<div class="tab-pane" id="tab_WALRollTime">
<table class="table table-striped">
<tr>
<th> Milliseconds to next roll</th>
<th> Milliseconds from last roll</th>
</tr>
<tr>
<td> <%=StringUtils.humanTimeDiff(millisToNextRoll) %></td>
<td> <%=StringUtils.humanTimeDiff(millisFromLastRoll) %></td>
</tr>
</table>
</div>
<div class="tab-pane" id="tab_SyncStats">
<table class="table table-striped">
<tr>
<th> Time</th>
<th> Sync Wait</th>
<th> Last num of synced entries</th>
<th> Total Synced</th>
<th> Synced per second</th>
</tr>
<% for (int i = syncMetricsBuff.size() - 1; i >= 0; --i) { %>
<% WALProcedureStore.SyncMetrics syncMetrics = syncMetricsBuff.get(i); %>
<tr>
<td> <%= new Date(syncMetrics.getTimestamp()) %></a></td>
<td> <%= StringUtils.humanTimeDiff(syncMetrics.getSyncWaitMs()) %></td>
<td> <%= syncMetrics.getSyncedEntries() %></td>
<td> <%= StringUtils.humanSize(syncMetrics.getTotalSyncedBytes()) %></td>
<td> <%= StringUtils.humanSize(syncMetrics.getSyncedPerSec()) %></td>
</tr>
<%} %>
</table>
</div>
</div>
</div>
</div>
<script src="/static/js/jquery.min.js" type="text/javascript"></script>
<script src="/static/js/bootstrap.min.js" type="text/javascript"></script>