From b685cf69dae131832b0d76c06429f2d65c9cbc3c Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Wed, 11 Sep 2013 22:22:17 +0000 Subject: [PATCH] HBASE-9460 Fix HLogPerformanceEvaluation so runs against localfs git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1522073 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/regionserver/wal/HLog.java | 44 ++++++++++--------- .../regionserver/wal/ProtobufLogReader.java | 22 +++++++--- .../wal/HLogPerformanceEvaluation.java | 28 ++++++++---- 3 files changed, 58 insertions(+), 36 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index c8eae4bc720..8528863edab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -23,7 +23,6 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.List; -import java.util.Set; import java.util.UUID; import java.util.regex.Pattern; @@ -42,10 +41,12 @@ import org.apache.hadoop.io.Writable; @InterfaceAudience.Private +// TODO: Rename interface to WAL public interface HLog { Log LOG = LogFactory.getLog(HLog.class); /** File Extension used while splitting an HLog into regions (HBASE-2312) */ + // TODO: this seems like an implementation detail that does not belong here. String SPLITTING_EXT = "-splitting"; boolean SPLIT_SKIP_ERRORS_DEFAULT = false; /** The hbase:meta region's HLog filename extension */ @@ -55,10 +56,11 @@ public interface HLog { * Configuration name of HLog Trailer's warning size. If a waltrailer's size is greater than the * configured size, a warning is logged. This is used with Protobuf reader/writer. */ - String WAL_TRAILER_WARN_SIZE = - "hbase.regionserver.waltrailer.warn.size"; - int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024*1024; // 1MB + // TODO: Implementation detail. Why in here? + String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size"; + int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB + // TODO: Implemenation detail. Why in here? Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+"); String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; @@ -67,7 +69,7 @@ public interface HLog { /** * @param fs File system. * @param path Path. - * @param c Config. + * @param c Configuration. * @param s Input stream that may have been pre-opened by the caller; may be null. */ void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException; @@ -87,6 +89,7 @@ public interface HLog { * @return the WALTrailer of the current HLog. It may be null in case of legacy or corrupt WAL * files. */ + // TODO: What we need a trailer on WAL for? WALTrailer getWALTrailer(); } @@ -109,9 +112,10 @@ public interface HLog { } /** - * Utility class that lets us keep track of the edit with it's key Only used - * when splitting logs + * Utility class that lets us keep track of the edit with it's key. + * Only used when splitting logs. */ + // TODO: Remove this Writable. class Entry implements Writable { private WALEdit edit; private HLogKey key; @@ -124,10 +128,8 @@ public interface HLog { /** * Constructor for both params * - * @param edit - * log's edit - * @param key - * log's key + * @param edit log's edit + * @param key log's key */ public Entry(HLogKey key, WALEdit edit) { super(); @@ -199,16 +201,14 @@ public interface HLog { /** * @return Current state of the monotonically increasing file id. */ + // TODO: Remove. Implementation detail. long getFilenum(); /** - * Called by HRegionServer when it opens a new region to ensure that log - * sequence numbers are always greater than the latest sequence number of the - * region being brought on-line. + * Called to ensure that log sequence numbers are always greater * - * @param newvalue - * We'll set log edit/sequence number to this value if it is greater - * than the current value. + * @param newvalue We'll set log edit/sequence number to this value if it is greater + * than the current value. */ void setSequenceNumber(final long newvalue); @@ -217,6 +217,7 @@ public interface HLog { */ long getSequenceNumber(); + // TODO: Log rolling should not be in this interface. /** * Roll the log writer. That is, start writing log messages to a new file. * @@ -274,7 +275,7 @@ public interface HLog { /** * Append a set of edits to the log. Log edits are keyed by (encoded) - * regionName, rowname, and log-sequence-id. The HLog is flushed after this + * regionName, row name, and log-sequence-id. The HLog is flushed after this * transaction is written to the log. * @param info * @param tableName @@ -298,9 +299,10 @@ public interface HLog { * @return txid of this transaction * @throws IOException */ - public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, + public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, final long now, HTableDescriptor htd) throws IOException; - + + // TODO: Do we need all these versions of sync? void hsync() throws IOException; void hflush() throws IOException; @@ -312,6 +314,7 @@ public interface HLog { /** * Obtain a log sequence number. */ + // TODO: Name better to differentiate from getSequenceNumber. long obtainSeqNum(); /** @@ -355,6 +358,7 @@ public interface HLog { * * @return lowReplicationRollEnabled */ + // TODO: This is implementation detail? boolean isLowReplicationRollEnabled(); /** Gets the earliest sequence number in the memstore for this particular region. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 02458656e94..26d8e61ab3d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -193,19 +193,24 @@ public class ProtobufLogReader extends ReaderBase { @Override protected boolean readNext(HLog.Entry entry) throws IOException { while (true) { + // OriginalPosition might be < 0 on local fs; if so, it is useless to us. long originalPosition = this.inputStream.getPos(); - if (trailerPresent && originalPosition == this.walEditsStopOffset) return false; + if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) { + return false; + } WALKey.Builder builder = WALKey.newBuilder(); - int size = 0; + long size = 0; try { - int originalAvailable = this.inputStream.available(); + long available = -1; try { int firstByte = this.inputStream.read(); if (firstByte == -1) { throw new EOFException("First byte is negative"); } size = CodedInputStream.readRawVarint32(firstByte, this.inputStream); - if (this.inputStream.available() < size) { + // available may be < 0 on local fs for instance. If so, can't depend on it. + available = this.inputStream.available(); + if (available > 0 && available < size) { throw new EOFException("Available stream not enough for edit, " + "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= " + size); @@ -214,9 +219,8 @@ public class ProtobufLogReader extends ReaderBase { builder.mergeFrom(limitedInput); } catch (InvalidProtocolBufferException ipbe) { throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" + - originalPosition + ", currentPosition=" + this.inputStream.getPos() + - ", messageSize=" + size + ", originalAvailable=" + originalAvailable + - ", currentAvailable=" + this.inputStream.available()).initCause(ipbe); + originalPosition + ", currentPosition=" + this.inputStream.getPos() + + ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe); } if (!builder.isInitialized()) { // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit. @@ -258,6 +262,10 @@ public class ProtobufLogReader extends ReaderBase { } } catch (EOFException eof) { LOG.trace("Encountered a malformed edit, seeking back to last good position in file", eof); + // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs) + if (originalPosition < 0) throw eof; + // Else restore our position to original location in hope that next time through we will + // read successfully. seekOnFs(originalPosition); return false; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java index 150e54e3129..e24995b7fcc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java @@ -69,7 +69,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool /** * Perform HLog.append() of Put object, for the number of iterations requested. - * Keys and Vaues are generated randomly, the number of column familes, + * Keys and Vaues are generated randomly, the number of column families, * qualifiers and key/value size is tunable by the user. */ class HLogPutBenchmark implements Runnable { @@ -126,6 +126,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool boolean noSync = false; boolean verify = false; boolean verbose = false; + boolean cleanup = true; long roll = Long.MAX_VALUE; // Process command line args for (int i = 0; i < args.length; i++) { @@ -151,6 +152,8 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool verify = true; } else if (cmd.equals("-verbose")) { verbose = true; + } else if (cmd.equals("-nocleanup")) { + cleanup = false; } else if (cmd.equals("-roll")) { roll = Long.parseLong(args[++i]); } else if (cmd.equals("-h")) { @@ -205,8 +208,12 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool if (verify) { Path dir = ((FSHLog) hlog).getDir(); long editCount = 0; - for (FileStatus fss: fs.listStatus(dir)) { - editCount += verify(fss.getPath(), verbose); + FileStatus [] fsss = fs.listStatus(dir); + if (fsss.length == 0) throw new IllegalStateException("No WAL found"); + for (FileStatus fss: fsss) { + Path p = fss.getPath(); + if (!fs.exists(p)) throw new IllegalStateException(p.toString()); + editCount += verify(p, verbose); } long expected = numIterations * numThreads; if (editCount != expected) { @@ -216,7 +223,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool } finally { if (region != null) closeRegion(region); // Remove the root dir for this test region - cleanRegionRootDir(fs, rootRegionDir); + if (cleanup) cleanRegionRootDir(fs, rootRegionDir); } } finally { fs.close(); @@ -243,14 +250,16 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool * @throws IOException */ private long verify(final Path wal, final boolean verbose) throws IOException { - HLog.Reader reader = HLogFactory.createReader(wal.getFileSystem(getConf()), - wal, getConf()); + HLog.Reader reader = HLogFactory.createReader(wal.getFileSystem(getConf()), wal, getConf()); long previousSeqid = -1; long count = 0; try { while (true) { Entry e = reader.next(); - if (e == null) break; + if (e == null) { + LOG.debug("Read count=" + count + " from " + wal); + break; + } count++; long seqid = e.getKey().getLogSeqNum(); if (verbose) LOG.info("seqid=" + seqid); @@ -282,6 +291,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool System.err.println(" -qualifiers Number of qualifiers to write."); System.err.println(" -keySize Row key size in byte."); System.err.println(" -valueSize Row/Col value size in byte."); + System.err.println(" -nocleanup Do NOT remove test data when done."); System.err.println(" -nosync Append without syncing"); System.err.println(" -verify Verify edits written in sequence"); System.err.println(" -verbose Output extra info; e.g. all edit seq ids when verifying"); @@ -342,7 +352,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool Thread[] threads = new Thread[numThreads]; long startTime = System.currentTimeMillis(); for (int i = 0; i < numThreads; ++i) { - threads[i] = new Thread(runnable); + threads[i] = new Thread(runnable, "t" + i); threads[i].start(); } for (Thread t : threads) t.join(); @@ -355,7 +365,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool * Call this method to avoid the {@link #main(String[])} System.exit. * @param args * @return errCode - * @throws Exception + * @throws Exception */ static int innerMain(final String [] args) throws Exception { return ToolRunner.run(HBaseConfiguration.create(), new HLogPerformanceEvaluation(), args);