HBASE-15144 Procedure v2 - Web UI displaying Store state
This commit is contained in:
parent
7c290e922a
commit
210ed97688
|
@ -22,12 +22,12 @@ import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
|
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
|
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
|
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
|
||||||
|
@ -42,24 +42,29 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
|
||||||
|
|
||||||
private ProcedureWALHeader header;
|
private ProcedureWALHeader header;
|
||||||
private FSDataInputStream stream;
|
private FSDataInputStream stream;
|
||||||
private FileStatus logStatus;
|
|
||||||
private FileSystem fs;
|
private FileSystem fs;
|
||||||
private Path logFile;
|
private Path logFile;
|
||||||
private long startPos;
|
private long startPos;
|
||||||
private long minProcId;
|
private long minProcId;
|
||||||
private long maxProcId;
|
private long maxProcId;
|
||||||
|
private long logSize;
|
||||||
|
private long timestamp;
|
||||||
|
|
||||||
public ProcedureWALFile(final FileSystem fs, final FileStatus logStatus) {
|
public ProcedureWALFile(final FileSystem fs, final FileStatus logStatus) {
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
this.logStatus = logStatus;
|
|
||||||
this.logFile = logStatus.getPath();
|
this.logFile = logStatus.getPath();
|
||||||
|
this.logSize = logStatus.getLen();
|
||||||
|
this.timestamp = logStatus.getModificationTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ProcedureWALFile(FileSystem fs, Path logFile, ProcedureWALHeader header, long startPos) {
|
public ProcedureWALFile(FileSystem fs, Path logFile, ProcedureWALHeader header,
|
||||||
|
long startPos, long timestamp) {
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
this.logFile = logFile;
|
|
||||||
this.header = header;
|
this.header = header;
|
||||||
|
this.logFile = logFile;
|
||||||
this.startPos = startPos;
|
this.startPos = startPos;
|
||||||
|
this.logSize = startPos;
|
||||||
|
this.timestamp = timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void open() throws IOException {
|
public void open() throws IOException {
|
||||||
|
@ -77,7 +82,7 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
|
||||||
|
|
||||||
public ProcedureWALTrailer readTrailer() throws IOException {
|
public ProcedureWALTrailer readTrailer() throws IOException {
|
||||||
try {
|
try {
|
||||||
return ProcedureWALFormat.readTrailer(stream, startPos, logStatus.getLen());
|
return ProcedureWALFormat.readTrailer(stream, startPos, logSize);
|
||||||
} finally {
|
} finally {
|
||||||
stream.seek(startPos);
|
stream.seek(startPos);
|
||||||
}
|
}
|
||||||
|
@ -112,6 +117,10 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
|
||||||
return header;
|
return header;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getTimestamp() {
|
||||||
|
return timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isCompacted() {
|
public boolean isCompacted() {
|
||||||
return header.getType() == ProcedureWALFormat.LOG_TYPE_COMPACTED;
|
return header.getType() == ProcedureWALFormat.LOG_TYPE_COMPACTED;
|
||||||
}
|
}
|
||||||
|
@ -121,7 +130,14 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getSize() {
|
public long getSize() {
|
||||||
return logStatus != null ? logStatus.getLen() : 0;
|
return logSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to update in-progress log sizes. the FileStatus will report 0 otherwise.
|
||||||
|
*/
|
||||||
|
void addToSize(long size) {
|
||||||
|
this.logSize += size;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeFile() throws IOException {
|
public void removeFile() throws IOException {
|
||||||
|
|
|
@ -23,14 +23,14 @@ import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.io.util.StreamUtils;
|
import org.apache.hadoop.hbase.io.util.StreamUtils;
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
|
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
|
||||||
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
|
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
|
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
|
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
|
||||||
|
@ -113,7 +113,7 @@ public final class ProcedureWALFormat {
|
||||||
* | offset |-----+
|
* | offset |-----+
|
||||||
* +-----------------+
|
* +-----------------+
|
||||||
*/
|
*/
|
||||||
public static void writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker)
|
public static long writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
long offset = stream.getPos();
|
long offset = stream.getPos();
|
||||||
|
|
||||||
|
@ -128,6 +128,7 @@ public final class ProcedureWALFormat {
|
||||||
stream.write(TRAILER_VERSION);
|
stream.write(TRAILER_VERSION);
|
||||||
StreamUtils.writeLong(stream, TRAILER_MAGIC);
|
StreamUtils.writeLong(stream, TRAILER_MAGIC);
|
||||||
StreamUtils.writeLong(stream, offset);
|
StreamUtils.writeLong(stream, offset);
|
||||||
|
return stream.getPos() - offset;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ProcedureWALHeader readHeader(InputStream stream)
|
public static ProcedureWALHeader readHeader(InputStream stream)
|
||||||
|
|
|
@ -23,15 +23,17 @@ import java.io.IOException;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.hbase.ProcedureInfo;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.ProcedureInfo;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
|
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
|
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
|
||||||
|
|
||||||
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper class that loads the procedures stored in a WAL
|
* Helper class that loads the procedures stored in a WAL
|
||||||
*/
|
*/
|
||||||
|
@ -142,7 +144,7 @@ public class ProcedureWALFormatReader {
|
||||||
throw new CorruptedWALProcedureStoreException("Invalid entry: " + entry);
|
throw new CorruptedWALProcedureStoreException("Invalid entry: " + entry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (InvalidProtocolBufferException e) {
|
||||||
LOG.error("got an exception while reading the procedure WAL: " + log, e);
|
LOG.error("got an exception while reading the procedure WAL: " + log, e);
|
||||||
loader.markCorruptedWAL(log, e);
|
loader.markCorruptedWAL(log, e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,23 +18,24 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.procedure2.store.wal;
|
package org.apache.hadoop.hbase.procedure2.store.wal;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import java.util.concurrent.locks.Condition;
|
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
|
||||||
import java.util.concurrent.LinkedTransferQueue;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.LinkedTransferQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.concurrent.locks.Condition;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
import org.apache.commons.collections.buffer.CircularFifoBuffer;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -98,6 +99,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
private static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
|
private static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
|
||||||
private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M
|
private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M
|
||||||
|
|
||||||
|
private static final String STORE_WAL_SYNC_STATS_COUNT =
|
||||||
|
"hbase.procedure.store.wal.sync.stats.count";
|
||||||
|
private static final int DEFAULT_SYNC_STATS_COUNT = 10;
|
||||||
|
|
||||||
private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>();
|
private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>();
|
||||||
private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
|
private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
|
||||||
private final ReentrantLock lock = new ReentrantLock();
|
private final ReentrantLock lock = new ReentrantLock();
|
||||||
|
@ -133,6 +138,37 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
private boolean useHsync;
|
private boolean useHsync;
|
||||||
private int syncWaitMsec;
|
private int syncWaitMsec;
|
||||||
|
|
||||||
|
// Variables used for UI display
|
||||||
|
private CircularFifoBuffer syncMetricsBuffer;
|
||||||
|
|
||||||
|
public static class SyncMetrics {
|
||||||
|
private long timestamp;
|
||||||
|
private long syncWaitMs;
|
||||||
|
private long totalSyncedBytes;
|
||||||
|
private int syncedEntries;
|
||||||
|
private float syncedPerSec;
|
||||||
|
|
||||||
|
public long getTimestamp() {
|
||||||
|
return timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getSyncWaitMs() {
|
||||||
|
return syncWaitMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getTotalSyncedBytes() {
|
||||||
|
return totalSyncedBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getSyncedEntries() {
|
||||||
|
return syncedEntries;
|
||||||
|
}
|
||||||
|
|
||||||
|
public float getSyncedPerSec() {
|
||||||
|
return syncedPerSec;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir,
|
public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir,
|
||||||
final LeaseRecovery leaseRecovery) {
|
final LeaseRecovery leaseRecovery) {
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
|
@ -166,6 +202,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
|
syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
|
||||||
useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
|
useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
|
||||||
|
|
||||||
|
// WebUI
|
||||||
|
syncMetricsBuffer = new CircularFifoBuffer(
|
||||||
|
conf.getInt(STORE_WAL_SYNC_STATS_COUNT, DEFAULT_SYNC_STATS_COUNT));
|
||||||
|
|
||||||
// Init sync thread
|
// Init sync thread
|
||||||
syncThread = new Thread("WALProcedureStoreSyncThread") {
|
syncThread = new Thread("WALProcedureStoreSyncThread") {
|
||||||
@Override
|
@Override
|
||||||
|
@ -509,6 +549,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void syncLoop() throws Throwable {
|
private void syncLoop() throws Throwable {
|
||||||
|
long totalSyncedToStore = 0;
|
||||||
inSync.set(false);
|
inSync.set(false);
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
|
@ -533,23 +574,37 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
|
// Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
|
||||||
long syncWaitSt = System.currentTimeMillis();
|
final long syncWaitSt = System.currentTimeMillis();
|
||||||
if (slotIndex != slots.length) {
|
if (slotIndex != slots.length) {
|
||||||
slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
|
slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
|
|
||||||
|
final long currentTs = System.currentTimeMillis();
|
||||||
|
final long syncWaitMs = currentTs - syncWaitSt;
|
||||||
|
final float rollSec = getMillisFromLastRoll() / 1000.0f;
|
||||||
|
final float syncedPerSec = totalSyncedToStore / rollSec;
|
||||||
if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
|
if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
|
||||||
float rollSec = getMillisFromLastRoll() / 1000.0f;
|
|
||||||
LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
|
LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
|
||||||
StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
|
StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
|
||||||
StringUtils.humanSize(totalSynced.get()),
|
StringUtils.humanSize(totalSyncedToStore),
|
||||||
StringUtils.humanSize(totalSynced.get() / rollSec)));
|
StringUtils.humanSize(syncedPerSec)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update webui circular buffers (TODO: get rid of allocations)
|
||||||
|
final SyncMetrics syncMetrics = new SyncMetrics();
|
||||||
|
syncMetrics.timestamp = currentTs;
|
||||||
|
syncMetrics.syncWaitMs = syncWaitMs;
|
||||||
|
syncMetrics.syncedEntries = slotIndex;
|
||||||
|
syncMetrics.totalSyncedBytes = totalSyncedToStore;
|
||||||
|
syncMetrics.syncedPerSec = syncedPerSec;
|
||||||
|
syncMetricsBuffer.add(syncMetrics);
|
||||||
|
|
||||||
|
// sync
|
||||||
inSync.set(true);
|
inSync.set(true);
|
||||||
totalSynced.addAndGet(syncSlots());
|
long slotSize = syncSlots();
|
||||||
|
logs.getLast().addToSize(slotSize);
|
||||||
|
totalSyncedToStore = totalSynced.addAndGet(slotSize);
|
||||||
slotIndex = 0;
|
slotIndex = 0;
|
||||||
inSync.set(false);
|
inSync.set(false);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -569,6 +624,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ArrayList<SyncMetrics> getSyncMetrics() {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
return new ArrayList<SyncMetrics>(syncMetricsBuffer);
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private long syncSlots() throws Throwable {
|
private long syncSlots() throws Throwable {
|
||||||
int retry = 0;
|
int retry = 0;
|
||||||
int logRolled = 0;
|
int logRolled = 0;
|
||||||
|
@ -647,14 +711,14 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getMillisToNextPeriodicRoll() {
|
public long getMillisToNextPeriodicRoll() {
|
||||||
if (lastRollTs.get() > 0 && periodicRollMsec > 0) {
|
if (lastRollTs.get() > 0 && periodicRollMsec > 0) {
|
||||||
return periodicRollMsec - getMillisFromLastRoll();
|
return periodicRollMsec - getMillisFromLastRoll();
|
||||||
}
|
}
|
||||||
return Long.MAX_VALUE;
|
return Long.MAX_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getMillisFromLastRoll() {
|
public long getMillisFromLastRoll() {
|
||||||
return (System.currentTimeMillis() - lastRollTs.get());
|
return (System.currentTimeMillis() - lastRollTs.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -761,8 +825,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
stream = newStream;
|
stream = newStream;
|
||||||
flushLogId = logId;
|
flushLogId = logId;
|
||||||
totalSynced.set(0);
|
totalSynced.set(0);
|
||||||
lastRollTs.set(System.currentTimeMillis());
|
long rollTs = System.currentTimeMillis();
|
||||||
logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
|
lastRollTs.set(rollTs);
|
||||||
|
logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos, rollTs));
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Roll new state log: " + logId);
|
LOG.debug("Roll new state log: " + logId);
|
||||||
|
@ -776,7 +841,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
try {
|
try {
|
||||||
ProcedureWALFile log = logs.getLast();
|
ProcedureWALFile log = logs.getLast();
|
||||||
log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
|
log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
|
||||||
ProcedureWALFormat.writeTrailer(stream, storeTracker);
|
long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
|
||||||
|
log.addToSize(trailerSize);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Unable to write the trailer: " + e.getMessage());
|
LOG.warn("Unable to write the trailer: " + e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
|
@ -2184,6 +2184,10 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
||||||
return procedureStore != null ? procedureStore.getActiveLogs().size() : 0;
|
return procedureStore != null ? procedureStore.getActiveLogs().size() : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public WALProcedureStore getWalProcedureStore() {
|
||||||
|
return procedureStore;
|
||||||
|
}
|
||||||
|
|
||||||
public int getRegionServerInfoPort(final ServerName sn) {
|
public int getRegionServerInfoPort(final ServerName sn) {
|
||||||
RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
|
RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
|
||||||
if (info == null || info.getInfoPort() == 0) {
|
if (info == null || info.getInfoPort() == 0) {
|
||||||
|
@ -2392,7 +2396,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
||||||
}
|
}
|
||||||
return regionStates.getAverageLoad();
|
return regionStates.getAverageLoad();
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* @return the count of region split plans executed
|
* @return the count of region split plans executed
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -21,18 +21,31 @@
|
||||||
import="static org.apache.commons.lang.StringEscapeUtils.escapeXml"
|
import="static org.apache.commons.lang.StringEscapeUtils.escapeXml"
|
||||||
import="java.util.Collections"
|
import="java.util.Collections"
|
||||||
import="java.util.Comparator"
|
import="java.util.Comparator"
|
||||||
|
import="java.util.ArrayList"
|
||||||
import="java.util.Date"
|
import="java.util.Date"
|
||||||
import="java.util.List"
|
import="java.util.List"
|
||||||
|
import="java.util.Set"
|
||||||
|
import="org.apache.hadoop.conf.Configuration"
|
||||||
import="org.apache.hadoop.hbase.HBaseConfiguration"
|
import="org.apache.hadoop.hbase.HBaseConfiguration"
|
||||||
import="org.apache.hadoop.hbase.ProcedureInfo"
|
import="org.apache.hadoop.hbase.ProcedureInfo"
|
||||||
import="org.apache.hadoop.hbase.master.HMaster"
|
import="org.apache.hadoop.hbase.master.HMaster"
|
||||||
import="org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv"
|
import="org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv"
|
||||||
import="org.apache.hadoop.hbase.procedure2.ProcedureExecutor"
|
import="org.apache.hadoop.hbase.procedure2.ProcedureExecutor"
|
||||||
|
import="org.apache.hadoop.hbase.procedure2.store.wal.ProcedureWALFile"
|
||||||
|
import="org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore"
|
||||||
|
import="org.apache.hadoop.hbase.procedure2.util.StringUtils"
|
||||||
|
|
||||||
%>
|
%>
|
||||||
<%
|
<%
|
||||||
HMaster master = (HMaster)getServletContext().getAttribute(HMaster.MASTER);
|
HMaster master = (HMaster)getServletContext().getAttribute(HMaster.MASTER);
|
||||||
ProcedureExecutor<MasterProcedureEnv> procExecutor = master.getMasterProcedureExecutor();
|
ProcedureExecutor<MasterProcedureEnv> procExecutor = master.getMasterProcedureExecutor();
|
||||||
|
WALProcedureStore walStore = master.getWalProcedureStore();
|
||||||
|
|
||||||
|
ArrayList<WALProcedureStore.SyncMetrics> syncMetricsBuff = walStore.getSyncMetrics();
|
||||||
|
long millisToNextRoll = walStore.getMillisToNextPeriodicRoll();
|
||||||
|
long millisFromLastRoll = walStore.getMillisFromLastRoll();
|
||||||
|
ArrayList<ProcedureWALFile> procedureWALFiles = walStore.getActiveLogs();
|
||||||
|
Set<ProcedureWALFile> corruptedWALFiles = walStore.getCorruptedLogs();
|
||||||
List<ProcedureInfo> procedures = procExecutor.listProcedures();
|
List<ProcedureInfo> procedures = procExecutor.listProcedures();
|
||||||
Collections.sort(procedures, new Comparator<ProcedureInfo>() {
|
Collections.sort(procedures, new Comparator<ProcedureInfo>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -118,7 +131,110 @@
|
||||||
<% } %>
|
<% } %>
|
||||||
</table>
|
</table>
|
||||||
</div>
|
</div>
|
||||||
|
<br>
|
||||||
|
<div class="container-fluid content">
|
||||||
|
<div class="row">
|
||||||
|
<div class="page-header">
|
||||||
|
<h2>Procedure WAL State</h2>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
<div class="tabbable">
|
||||||
|
<ul class="nav nav-pills">
|
||||||
|
<li class="active">
|
||||||
|
<a href="#tab_WALFiles" data-toggle="tab">WAL files</a>
|
||||||
|
</li>
|
||||||
|
<li class="">
|
||||||
|
<a href="#tab_WALFilesCorrupted" data-toggle="tab">Corrupted WAL files</a>
|
||||||
|
</li>
|
||||||
|
<li class="">
|
||||||
|
<a href="#tab_WALRollTime" data-toggle="tab">WAL roll time</a>
|
||||||
|
</li>
|
||||||
|
<li class="">
|
||||||
|
<a href="#tab_SyncStats" data-toggle="tab">Sync stats</a>
|
||||||
|
</li>
|
||||||
|
</ul>
|
||||||
|
<div class="tab-content" style="padding-bottom: 9px; border-bottom: 1px solid #ddd;">
|
||||||
|
<div class="tab-pane active" id="tab_WALFiles">
|
||||||
|
<% if (procedureWALFiles != null && procedureWALFiles.size() > 0) { %>
|
||||||
|
<table class="table table-striped">
|
||||||
|
<tr>
|
||||||
|
<th>LogID</th>
|
||||||
|
<th>Size</th>
|
||||||
|
<th>Timestamp</th>
|
||||||
|
<th>Path</th>
|
||||||
|
</tr>
|
||||||
|
<% for (int i = procedureWALFiles.size() - 1; i >= 0; --i) { %>
|
||||||
|
<% ProcedureWALFile pwf = procedureWALFiles.get(i); %>
|
||||||
|
<tr>
|
||||||
|
<td> <%= pwf.getLogId() %></td>
|
||||||
|
<td> <%= StringUtils.humanSize(pwf.getSize()) %> </td>
|
||||||
|
<td> <%= new Date(pwf.getTimestamp()) %></a></td>
|
||||||
|
<td> <%= escapeXml(pwf.toString()) %></t>
|
||||||
|
</tr>
|
||||||
|
<% } %>
|
||||||
|
</table>
|
||||||
|
<% } else {%>
|
||||||
|
<p> No WAL files</p>
|
||||||
|
<% } %>
|
||||||
|
</div>
|
||||||
|
<div class="tab-pane" id="tab_WALFilesCorrupted">
|
||||||
|
<% if (corruptedWALFiles != null && corruptedWALFiles.size() > 0) { %>
|
||||||
|
<table class="table table-striped">
|
||||||
|
<tr>
|
||||||
|
<th>LogID</th>
|
||||||
|
<th>Size</th>
|
||||||
|
<th>Timestamp</th>
|
||||||
|
<th>Path</th>
|
||||||
|
</tr>
|
||||||
|
<% for (ProcedureWALFile cwf:corruptedWALFiles) { %>
|
||||||
|
<tr>
|
||||||
|
<td> <%= cwf.getLogId() %></td>
|
||||||
|
<td> <%= StringUtils.humanSize(cwf.getSize()) %> </td>
|
||||||
|
<td> <%= new Date(cwf.getTimestamp()) %></a></td>
|
||||||
|
<td> <%= escapeXml(cwf.toString()) %></t>
|
||||||
|
</tr>
|
||||||
|
<% } %>
|
||||||
|
</table>
|
||||||
|
<% } else {%>
|
||||||
|
<p> No corrupted WAL files</p>
|
||||||
|
<% } %>
|
||||||
|
</div>
|
||||||
|
<div class="tab-pane" id="tab_WALRollTime">
|
||||||
|
<table class="table table-striped">
|
||||||
|
<tr>
|
||||||
|
<th> Milliseconds to next roll</th>
|
||||||
|
<th> Milliseconds from last roll</th>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td> <%=StringUtils.humanTimeDiff(millisToNextRoll) %></td>
|
||||||
|
<td> <%=StringUtils.humanTimeDiff(millisFromLastRoll) %></td>
|
||||||
|
</tr>
|
||||||
|
</table>
|
||||||
|
</div>
|
||||||
|
<div class="tab-pane" id="tab_SyncStats">
|
||||||
|
<table class="table table-striped">
|
||||||
|
<tr>
|
||||||
|
<th> Time</th>
|
||||||
|
<th> Sync Wait</th>
|
||||||
|
<th> Last num of synced entries</th>
|
||||||
|
<th> Total Synced</th>
|
||||||
|
<th> Synced per second</th>
|
||||||
|
</tr>
|
||||||
|
<% for (int i = syncMetricsBuff.size() - 1; i >= 0; --i) { %>
|
||||||
|
<% WALProcedureStore.SyncMetrics syncMetrics = syncMetricsBuff.get(i); %>
|
||||||
|
<tr>
|
||||||
|
<td> <%= new Date(syncMetrics.getTimestamp()) %></a></td>
|
||||||
|
<td> <%= StringUtils.humanTimeDiff(syncMetrics.getSyncWaitMs()) %></td>
|
||||||
|
<td> <%= syncMetrics.getSyncedEntries() %></td>
|
||||||
|
<td> <%= StringUtils.humanSize(syncMetrics.getTotalSyncedBytes()) %></td>
|
||||||
|
<td> <%= StringUtils.humanSize(syncMetrics.getSyncedPerSec()) %></td>
|
||||||
|
</tr>
|
||||||
|
<%} %>
|
||||||
|
</table>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
<script src="/static/js/jquery.min.js" type="text/javascript"></script>
|
<script src="/static/js/jquery.min.js" type="text/javascript"></script>
|
||||||
<script src="/static/js/bootstrap.min.js" type="text/javascript"></script>
|
<script src="/static/js/bootstrap.min.js" type="text/javascript"></script>
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue