HBASE-9460 Fix HLogPerformanceEvaluation so runs against localfs

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1522073 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-09-11 22:22:17 +00:00
parent 6fa574e115
commit b685cf69da
3 changed files with 58 additions and 36 deletions

View File

@ -23,7 +23,6 @@ import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -42,10 +41,12 @@ import org.apache.hadoop.io.Writable;
@InterfaceAudience.Private @InterfaceAudience.Private
// TODO: Rename interface to WAL
public interface HLog { public interface HLog {
Log LOG = LogFactory.getLog(HLog.class); Log LOG = LogFactory.getLog(HLog.class);
/** File Extension used while splitting an HLog into regions (HBASE-2312) */ /** File Extension used while splitting an HLog into regions (HBASE-2312) */
// TODO: this seems like an implementation detail that does not belong here.
String SPLITTING_EXT = "-splitting"; String SPLITTING_EXT = "-splitting";
boolean SPLIT_SKIP_ERRORS_DEFAULT = false; boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
/** The hbase:meta region's HLog filename extension */ /** The hbase:meta region's HLog filename extension */
@ -55,10 +56,11 @@ public interface HLog {
* Configuration name of HLog Trailer's warning size. If a waltrailer's size is greater than the * Configuration name of HLog Trailer's warning size. If a waltrailer's size is greater than the
* configured size, a warning is logged. This is used with Protobuf reader/writer. * configured size, a warning is logged. This is used with Protobuf reader/writer.
*/ */
String WAL_TRAILER_WARN_SIZE = // TODO: Implementation detail. Why in here?
"hbase.regionserver.waltrailer.warn.size"; String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size";
int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024*1024; // 1MB int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB
// TODO: Implemenation detail. Why in here?
Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+"); Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
@ -67,7 +69,7 @@ public interface HLog {
/** /**
* @param fs File system. * @param fs File system.
* @param path Path. * @param path Path.
* @param c Config. * @param c Configuration.
* @param s Input stream that may have been pre-opened by the caller; may be null. * @param s Input stream that may have been pre-opened by the caller; may be null.
*/ */
void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException; void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException;
@ -87,6 +89,7 @@ public interface HLog {
* @return the WALTrailer of the current HLog. It may be null in case of legacy or corrupt WAL * @return the WALTrailer of the current HLog. It may be null in case of legacy or corrupt WAL
* files. * files.
*/ */
// TODO: What we need a trailer on WAL for?
WALTrailer getWALTrailer(); WALTrailer getWALTrailer();
} }
@ -109,9 +112,10 @@ public interface HLog {
} }
/** /**
* Utility class that lets us keep track of the edit with it's key Only used * Utility class that lets us keep track of the edit with it's key.
* when splitting logs * Only used when splitting logs.
*/ */
// TODO: Remove this Writable.
class Entry implements Writable { class Entry implements Writable {
private WALEdit edit; private WALEdit edit;
private HLogKey key; private HLogKey key;
@ -124,10 +128,8 @@ public interface HLog {
/** /**
* Constructor for both params * Constructor for both params
* *
* @param edit * @param edit log's edit
* log's edit * @param key log's key
* @param key
* log's key
*/ */
public Entry(HLogKey key, WALEdit edit) { public Entry(HLogKey key, WALEdit edit) {
super(); super();
@ -199,15 +201,13 @@ public interface HLog {
/** /**
* @return Current state of the monotonically increasing file id. * @return Current state of the monotonically increasing file id.
*/ */
// TODO: Remove. Implementation detail.
long getFilenum(); long getFilenum();
/** /**
* Called by HRegionServer when it opens a new region to ensure that log * Called to ensure that log sequence numbers are always greater
* sequence numbers are always greater than the latest sequence number of the
* region being brought on-line.
* *
* @param newvalue * @param newvalue We'll set log edit/sequence number to this value if it is greater
* We'll set log edit/sequence number to this value if it is greater
* than the current value. * than the current value.
*/ */
void setSequenceNumber(final long newvalue); void setSequenceNumber(final long newvalue);
@ -217,6 +217,7 @@ public interface HLog {
*/ */
long getSequenceNumber(); long getSequenceNumber();
// TODO: Log rolling should not be in this interface.
/** /**
* Roll the log writer. That is, start writing log messages to a new file. * Roll the log writer. That is, start writing log messages to a new file.
* *
@ -274,7 +275,7 @@ public interface HLog {
/** /**
* Append a set of edits to the log. Log edits are keyed by (encoded) * Append a set of edits to the log. Log edits are keyed by (encoded)
* regionName, rowname, and log-sequence-id. The HLog is flushed after this * regionName, row name, and log-sequence-id. The HLog is flushed after this
* transaction is written to the log. * transaction is written to the log.
* @param info * @param info
* @param tableName * @param tableName
@ -301,6 +302,7 @@ public interface HLog {
public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
List<UUID> clusterIds, final long now, HTableDescriptor htd) throws IOException; List<UUID> clusterIds, final long now, HTableDescriptor htd) throws IOException;
// TODO: Do we need all these versions of sync?
void hsync() throws IOException; void hsync() throws IOException;
void hflush() throws IOException; void hflush() throws IOException;
@ -312,6 +314,7 @@ public interface HLog {
/** /**
* Obtain a log sequence number. * Obtain a log sequence number.
*/ */
// TODO: Name better to differentiate from getSequenceNumber.
long obtainSeqNum(); long obtainSeqNum();
/** /**
@ -355,6 +358,7 @@ public interface HLog {
* *
* @return lowReplicationRollEnabled * @return lowReplicationRollEnabled
*/ */
// TODO: This is implementation detail?
boolean isLowReplicationRollEnabled(); boolean isLowReplicationRollEnabled();
/** Gets the earliest sequence number in the memstore for this particular region. /** Gets the earliest sequence number in the memstore for this particular region.

View File

@ -193,19 +193,24 @@ public class ProtobufLogReader extends ReaderBase {
@Override @Override
protected boolean readNext(HLog.Entry entry) throws IOException { protected boolean readNext(HLog.Entry entry) throws IOException {
while (true) { while (true) {
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
long originalPosition = this.inputStream.getPos(); long originalPosition = this.inputStream.getPos();
if (trailerPresent && originalPosition == this.walEditsStopOffset) return false; if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
return false;
}
WALKey.Builder builder = WALKey.newBuilder(); WALKey.Builder builder = WALKey.newBuilder();
int size = 0; long size = 0;
try { try {
int originalAvailable = this.inputStream.available(); long available = -1;
try { try {
int firstByte = this.inputStream.read(); int firstByte = this.inputStream.read();
if (firstByte == -1) { if (firstByte == -1) {
throw new EOFException("First byte is negative"); throw new EOFException("First byte is negative");
} }
size = CodedInputStream.readRawVarint32(firstByte, this.inputStream); size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
if (this.inputStream.available() < size) { // available may be < 0 on local fs for instance. If so, can't depend on it.
available = this.inputStream.available();
if (available > 0 && available < size) {
throw new EOFException("Available stream not enough for edit, " + throw new EOFException("Available stream not enough for edit, " +
"inputStream.available()= " + this.inputStream.available() + ", " + "inputStream.available()= " + this.inputStream.available() + ", " +
"entry size= " + size); "entry size= " + size);
@ -215,8 +220,7 @@ public class ProtobufLogReader extends ReaderBase {
} catch (InvalidProtocolBufferException ipbe) { } catch (InvalidProtocolBufferException ipbe) {
throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" + throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
originalPosition + ", currentPosition=" + this.inputStream.getPos() + originalPosition + ", currentPosition=" + this.inputStream.getPos() +
", messageSize=" + size + ", originalAvailable=" + originalAvailable + ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);
", currentAvailable=" + this.inputStream.available()).initCause(ipbe);
} }
if (!builder.isInitialized()) { if (!builder.isInitialized()) {
// TODO: not clear if we should try to recover from corrupt PB that looks semi-legit. // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
@ -258,6 +262,10 @@ public class ProtobufLogReader extends ReaderBase {
} }
} catch (EOFException eof) { } catch (EOFException eof) {
LOG.trace("Encountered a malformed edit, seeking back to last good position in file", eof); LOG.trace("Encountered a malformed edit, seeking back to last good position in file", eof);
// If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)
if (originalPosition < 0) throw eof;
// Else restore our position to original location in hope that next time through we will
// read successfully.
seekOnFs(originalPosition); seekOnFs(originalPosition);
return false; return false;
} }

View File

@ -69,7 +69,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
/** /**
* Perform HLog.append() of Put object, for the number of iterations requested. * Perform HLog.append() of Put object, for the number of iterations requested.
* Keys and Vaues are generated randomly, the number of column familes, * Keys and Vaues are generated randomly, the number of column families,
* qualifiers and key/value size is tunable by the user. * qualifiers and key/value size is tunable by the user.
*/ */
class HLogPutBenchmark implements Runnable { class HLogPutBenchmark implements Runnable {
@ -126,6 +126,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
boolean noSync = false; boolean noSync = false;
boolean verify = false; boolean verify = false;
boolean verbose = false; boolean verbose = false;
boolean cleanup = true;
long roll = Long.MAX_VALUE; long roll = Long.MAX_VALUE;
// Process command line args // Process command line args
for (int i = 0; i < args.length; i++) { for (int i = 0; i < args.length; i++) {
@ -151,6 +152,8 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
verify = true; verify = true;
} else if (cmd.equals("-verbose")) { } else if (cmd.equals("-verbose")) {
verbose = true; verbose = true;
} else if (cmd.equals("-nocleanup")) {
cleanup = false;
} else if (cmd.equals("-roll")) { } else if (cmd.equals("-roll")) {
roll = Long.parseLong(args[++i]); roll = Long.parseLong(args[++i]);
} else if (cmd.equals("-h")) { } else if (cmd.equals("-h")) {
@ -205,8 +208,12 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
if (verify) { if (verify) {
Path dir = ((FSHLog) hlog).getDir(); Path dir = ((FSHLog) hlog).getDir();
long editCount = 0; long editCount = 0;
for (FileStatus fss: fs.listStatus(dir)) { FileStatus [] fsss = fs.listStatus(dir);
editCount += verify(fss.getPath(), verbose); if (fsss.length == 0) throw new IllegalStateException("No WAL found");
for (FileStatus fss: fsss) {
Path p = fss.getPath();
if (!fs.exists(p)) throw new IllegalStateException(p.toString());
editCount += verify(p, verbose);
} }
long expected = numIterations * numThreads; long expected = numIterations * numThreads;
if (editCount != expected) { if (editCount != expected) {
@ -216,7 +223,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
} finally { } finally {
if (region != null) closeRegion(region); if (region != null) closeRegion(region);
// Remove the root dir for this test region // Remove the root dir for this test region
cleanRegionRootDir(fs, rootRegionDir); if (cleanup) cleanRegionRootDir(fs, rootRegionDir);
} }
} finally { } finally {
fs.close(); fs.close();
@ -243,14 +250,16 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
* @throws IOException * @throws IOException
*/ */
private long verify(final Path wal, final boolean verbose) throws IOException { private long verify(final Path wal, final boolean verbose) throws IOException {
HLog.Reader reader = HLogFactory.createReader(wal.getFileSystem(getConf()), HLog.Reader reader = HLogFactory.createReader(wal.getFileSystem(getConf()), wal, getConf());
wal, getConf());
long previousSeqid = -1; long previousSeqid = -1;
long count = 0; long count = 0;
try { try {
while (true) { while (true) {
Entry e = reader.next(); Entry e = reader.next();
if (e == null) break; if (e == null) {
LOG.debug("Read count=" + count + " from " + wal);
break;
}
count++; count++;
long seqid = e.getKey().getLogSeqNum(); long seqid = e.getKey().getLogSeqNum();
if (verbose) LOG.info("seqid=" + seqid); if (verbose) LOG.info("seqid=" + seqid);
@ -282,6 +291,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
System.err.println(" -qualifiers <N> Number of qualifiers to write."); System.err.println(" -qualifiers <N> Number of qualifiers to write.");
System.err.println(" -keySize <N> Row key size in byte."); System.err.println(" -keySize <N> Row key size in byte.");
System.err.println(" -valueSize <N> Row/Col value size in byte."); System.err.println(" -valueSize <N> Row/Col value size in byte.");
System.err.println(" -nocleanup Do NOT remove test data when done.");
System.err.println(" -nosync Append without syncing"); System.err.println(" -nosync Append without syncing");
System.err.println(" -verify Verify edits written in sequence"); System.err.println(" -verify Verify edits written in sequence");
System.err.println(" -verbose Output extra info; e.g. all edit seq ids when verifying"); System.err.println(" -verbose Output extra info; e.g. all edit seq ids when verifying");
@ -342,7 +352,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
Thread[] threads = new Thread[numThreads]; Thread[] threads = new Thread[numThreads];
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
for (int i = 0; i < numThreads; ++i) { for (int i = 0; i < numThreads; ++i) {
threads[i] = new Thread(runnable); threads[i] = new Thread(runnable, "t" + i);
threads[i].start(); threads[i].start();
} }
for (Thread t : threads) t.join(); for (Thread t : threads) t.join();