HBASE-9813 Log splitting doesn't prevent RS creating new hlog file

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1534785 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jxiang 2013-10-22 20:57:33 +00:00
parent 4c47c09a31
commit 9b93724dd8
14 changed files with 51 additions and 44 deletions

View File

@ -17,24 +17,21 @@
*/ */
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.hadoop.classification.InterfaceAudience;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
/** /**
* A set of static functions for running our custom WAL compression/decompression. * A set of static functions for running our custom WAL compression/decompression.
@ -81,7 +78,7 @@ public class Compressor {
} }
boolean compress = ((ReaderBase)in).hasCompression(); boolean compress = ((ReaderBase)in).hasCompression();
conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress); conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress);
out = HLogFactory.createWriter(outFS, output, conf); out = HLogFactory.createWALWriter(outFS, output, conf);
HLog.Entry e = null; HLog.Entry e = null;
while ((e = in.next()) != null) out.append(e); while ((e = in.next()) != null) out.append(e);

View File

@ -563,7 +563,7 @@ class FSHLog implements HLog, Syncable {
if (forMeta) { if (forMeta) {
//TODO: set a higher replication for the hlog files (HBASE-6773) //TODO: set a higher replication for the hlog files (HBASE-6773)
} }
return HLogFactory.createWriter(fs, path, conf); return HLogFactory.createWALWriter(fs, path, conf);
} }
/* /*

View File

@ -33,11 +33,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
@ -95,7 +94,7 @@ public interface HLog {
} }
interface Writer { interface Writer {
void init(FileSystem fs, Path path, Configuration c) throws IOException; void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException;
void close() throws IOException; void close() throws IOException;
@ -173,6 +172,7 @@ public interface HLog {
} }
@Override @Override
@SuppressWarnings("deprecation")
public void write(DataOutput dataOutput) throws IOException { public void write(DataOutput dataOutput) throws IOException {
this.key.write(dataOutput); this.key.write(dataOutput);
this.edit.write(dataOutput); this.edit.write(dataOutput);

View File

@ -172,8 +172,18 @@ public class HLogFactory {
* @return A WAL writer. Close when done with it. * @return A WAL writer. Close when done with it.
* @throws IOException * @throws IOException
*/ */
public static HLog.Writer createWriter(final FileSystem fs, public static HLog.Writer createWALWriter(final FileSystem fs,
final Path path, Configuration conf) final Path path, Configuration conf) throws IOException {
return createWriter(fs, path, conf, false);
}
public static HLog.Writer createRecoveredEditsWriter(final FileSystem fs,
final Path path, Configuration conf) throws IOException {
return createWriter(fs, path, conf, true);
}
private static HLog.Writer createWriter(final FileSystem fs,
final Path path, Configuration conf, boolean overwritable)
throws IOException { throws IOException {
try { try {
if (logWriterClass == null) { if (logWriterClass == null) {
@ -181,11 +191,10 @@ public class HLogFactory {
ProtobufLogWriter.class, Writer.class); ProtobufLogWriter.class, Writer.class);
} }
HLog.Writer writer = (HLog.Writer)logWriterClass.newInstance(); HLog.Writer writer = (HLog.Writer)logWriterClass.newInstance();
writer.init(fs, path, conf); writer.init(fs, path, conf, overwritable);
return writer; return writer;
} catch (Exception e) { } catch (Exception e) {
throw new IOException("cannot get log writer", e); throw new IOException("cannot get log writer", e);
} }
} }
} }

View File

@ -634,7 +634,7 @@ public class HLogSplitter {
*/ */
protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf) protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
throws IOException { throws IOException {
return HLogFactory.createWriter(fs, logfile, conf); return HLogFactory.createRecoveredEditsWriter(fs, logfile, conf);
} }
/** /**

View File

@ -28,12 +28,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.FSUtils;
/** /**
* Writer for protobuf-based WAL. * Writer for protobuf-based WAL.
@ -55,7 +54,8 @@ public class ProtobufLogWriter extends WriterBase {
} }
@Override @Override
public void init(FileSystem fs, Path path, Configuration conf) throws IOException { @SuppressWarnings("deprecation")
public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) throws IOException {
assert this.output == null; assert this.output == null;
boolean doCompress = initializeCompressionContext(conf, path); boolean doCompress = initializeCompressionContext(conf, path);
this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE, this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE,
@ -65,7 +65,7 @@ public class ProtobufLogWriter extends WriterBase {
"hbase.regionserver.hlog.replication", FSUtils.getDefaultReplication(fs, path)); "hbase.regionserver.hlog.replication", FSUtils.getDefaultReplication(fs, path));
long blockSize = conf.getLong("hbase.regionserver.hlog.blocksize", long blockSize = conf.getLong("hbase.regionserver.hlog.blocksize",
FSUtils.getDefaultBlockSize(fs, path)); FSUtils.getDefaultBlockSize(fs, path));
output = fs.create(path, true, bufferSize, replication, blockSize); output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null);
output.write(ProtobufLogReader.PB_WAL_MAGIC); output.write(ProtobufLogReader.PB_WAL_MAGIC);
WALHeader.newBuilder().setHasCompression(doCompress).build().writeDelimitedTo(output); WALHeader.newBuilder().setHasCompression(doCompress).build().writeDelimitedTo(output);

View File

@ -66,7 +66,7 @@ class SnapshotLogSplitter implements Closeable {
public LogWriter(final Configuration conf, final FileSystem fs, public LogWriter(final Configuration conf, final FileSystem fs,
final Path logDir, long seqId) throws IOException { final Path logDir, long seqId) throws IOException {
logFile = new Path(logDir, logFileName(seqId, true)); logFile = new Path(logDir, logFileName(seqId, true));
this.writer = HLogFactory.createWriter(fs, logFile, conf); this.writer = HLogFactory.createRecoveredEditsWriter(fs, logFile, conf);
this.seqId = seqId; this.seqId = seqId;
} }

View File

@ -293,7 +293,7 @@ public class TestHRegion {
for (long i = minSeqId; i <= maxSeqId; i += 10) { for (long i = minSeqId; i <= maxSeqId; i += 10) {
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
fs.create(recoveredEdits); fs.create(recoveredEdits);
HLog.Writer writer = HLogFactory.createWriter(fs, recoveredEdits, conf); HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, conf);
long time = System.nanoTime(); long time = System.nanoTime();
WALEdit edit = new WALEdit(); WALEdit edit = new WALEdit();
@ -343,7 +343,7 @@ public class TestHRegion {
for (long i = minSeqId; i <= maxSeqId; i += 10) { for (long i = minSeqId; i <= maxSeqId; i += 10) {
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
fs.create(recoveredEdits); fs.create(recoveredEdits);
HLog.Writer writer = HLogFactory.createWriter(fs, recoveredEdits, conf); HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, conf);
long time = System.nanoTime(); long time = System.nanoTime();
WALEdit edit = new WALEdit(); WALEdit edit = new WALEdit();
@ -468,7 +468,7 @@ public class TestHRegion {
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000)); Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
fs.create(recoveredEdits); fs.create(recoveredEdits);
HLog.Writer writer = HLogFactory.createWriter(fs, recoveredEdits, conf); HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, conf);
long time = System.nanoTime(); long time = System.nanoTime();

View File

@ -31,13 +31,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Metadata; import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.io.compress.DefaultCodec;
@ -84,7 +83,7 @@ public class SequenceFileLogWriter extends WriterBase {
} }
@Override @Override
public void init(FileSystem fs, Path path, Configuration conf) public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable)
throws IOException { throws IOException {
boolean compress = initializeCompressionContext(conf, path); boolean compress = initializeCompressionContext(conf, path);

View File

@ -795,7 +795,7 @@ public class TestHLog {
fs.mkdirs(dir); fs.mkdirs(dir);
// Write log in pre-PB format. // Write log in pre-PB format.
sflw = new SequenceFileLogWriter(); sflw = new SequenceFileLogWriter();
sflw.init(fs, path, conf); sflw.init(fs, path, conf, false);
for (int i = 0; i < recordCount; ++i) { for (int i = 0; i < recordCount; ++i) {
HLogKey key = new HLogKey( HLogKey key = new HLogKey(
hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID); hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
@ -870,6 +870,8 @@ public class TestHLog {
final byte[] row = Bytes.toBytes("row"); final byte[] row = Bytes.toBytes("row");
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
Path path = new Path(dir, "temphlog"); Path path = new Path(dir, "temphlog");
// delete the log if already exists, for test only
fs.delete(path, true);
HLog.Writer writer = null; HLog.Writer writer = null;
HLog.Reader reader = null; HLog.Reader reader = null;
try { try {
@ -878,7 +880,7 @@ public class TestHLog {
HTableDescriptor htd = new HTableDescriptor(tableName); HTableDescriptor htd = new HTableDescriptor(tableName);
fs.mkdirs(dir); fs.mkdirs(dir);
// Write log in pb format. // Write log in pb format.
writer = HLogFactory.createWriter(fs, path, conf); writer = HLogFactory.createWALWriter(fs, path, conf);
for (int i = 0; i < recordCount; ++i) { for (int i = 0; i < recordCount; ++i) {
HLogKey key = new HLogKey( HLogKey key = new HLogKey(
hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID); hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);

View File

@ -376,7 +376,7 @@ public class TestHLogSplit {
Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, true); Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, true);
String parentOfParent = p.getParent().getParent().getName(); String parentOfParent = p.getParent().getParent().getName();
assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
HLogFactory.createWriter(fs, p, conf).close(); HLogFactory.createRecoveredEditsWriter(fs, p, conf).close();
} }
@Test (timeout=300000) @Test (timeout=300000)
@ -1168,7 +1168,7 @@ public class TestHLogSplit {
} }
fs.mkdirs(new Path(tableDir, region)); fs.mkdirs(new Path(tableDir, region));
HLog.Writer writer = HLogFactory.createWriter(fs, HLog.Writer writer = HLogFactory.createWALWriter(fs,
julietLog, conf); julietLog, conf);
appendEntry(writer, TableName.valueOf("juliet"), ("juliet").getBytes(), appendEntry(writer, TableName.valueOf("juliet"), ("juliet").getBytes(),
("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0); ("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0);
@ -1289,7 +1289,7 @@ public class TestHLogSplit {
conf, HBASEDIR, fs, null, null) { conf, HBASEDIR, fs, null, null) {
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
throws IOException { throws IOException {
HLog.Writer writer = HLogFactory.createWriter(fs, logfile, conf); HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, logfile, conf);
// After creating writer, simulate region's // After creating writer, simulate region's
// replayRecoveredEditsIfAny() which gets SplitEditFiles of this // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
// region and delete them, excluding files with '.temp' suffix. // region and delete them, excluding files with '.temp' suffix.
@ -1350,7 +1350,7 @@ public class TestHLogSplit {
HLog.Writer [] ws = new HLog.Writer[writers]; HLog.Writer [] ws = new HLog.Writer[writers];
int seq = 0; int seq = 0;
for (int i = 0; i < writers; i++) { for (int i = 0; i < writers; i++) {
ws[i] = HLogFactory.createWriter(dfs, new Path(HLOGDIR, HLOG_FILE_PREFIX + i), dfs.getConf()); ws[i] = HLogFactory.createWALWriter(dfs, new Path(HLOGDIR, HLOG_FILE_PREFIX + i), dfs.getConf());
for (int j = 0; j < entries; j++) { for (int j = 0; j < entries; j++) {
int prefix = 0; int prefix = 0;
for (String region : REGIONS) { for (String region : REGIONS) {
@ -1505,7 +1505,7 @@ public class TestHLogSplit {
private void injectEmptyFile(String suffix, boolean closeFile) private void injectEmptyFile(String suffix, boolean closeFile)
throws IOException { throws IOException {
HLog.Writer writer = HLogFactory.createWriter( HLog.Writer writer = HLogFactory.createWALWriter(
fs, new Path(HLOGDIR, HLOG_FILE_PREFIX + suffix), conf); fs, new Path(HLOGDIR, HLOG_FILE_PREFIX + suffix), conf);
if (closeFile) writer.close(); if (closeFile) writer.close();
} }

View File

@ -112,7 +112,7 @@ public class TestReadOldRootAndMetaEdits {
HConstants.DEFAULT_CLUSTER_ID), kvs); HConstants.DEFAULT_CLUSTER_ID), kvs);
// write above entries // write above entries
writer = HLogFactory.createWriter(fs, path, conf); writer = HLogFactory.createWALWriter(fs, path, conf);
writer.append(tEntry); writer.append(tEntry);
writer.append(rootEntry); writer.append(rootEntry);
writer.append(oldMetaEntry); writer.append(oldMetaEntry);

View File

@ -79,7 +79,7 @@ public class TestReplicationSource {
Path logPath = new Path(logDir, "log"); Path logPath = new Path(logDir, "log");
if (!FS.exists(logDir)) FS.mkdirs(logDir); if (!FS.exists(logDir)) FS.mkdirs(logDir);
if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir); if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir);
HLog.Writer writer = HLogFactory.createWriter(FS, HLog.Writer writer = HLogFactory.createWALWriter(FS,
logPath, conf); logPath, conf);
for(int i = 0; i < 3; i++) { for(int i = 0; i < 3; i++) {
byte[] b = Bytes.toBytes(Integer.toString(i)); byte[] b = Bytes.toBytes(Integer.toString(i));

View File

@ -144,7 +144,7 @@ public class TestSnapshotLogSplitter {
*/ */
private void writeTestLog(final Path logFile) throws IOException { private void writeTestLog(final Path logFile) throws IOException {
fs.mkdirs(logFile.getParent()); fs.mkdirs(logFile.getParent());
HLog.Writer writer = HLogFactory.createWriter(fs, logFile, conf); HLog.Writer writer = HLogFactory.createWALWriter(fs, logFile, conf);
try { try {
for (int i = 0; i < 7; ++i) { for (int i = 0; i < 7; ++i) {
TableName tableName = getTableName(i); TableName tableName = getTableName(i);