HBASE-5937 Refactor HLog into an interface

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1393126 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-10-02 19:29:19 +00:00
parent e0538fe83b
commit ac368c1ae8
54 changed files with 2792 additions and 2286 deletions

View File

@ -44,7 +44,7 @@ public class LongTermArchivingHFileCleaner extends BaseHFileCleanerDelegate {
private static final Log LOG = LogFactory.getLog(LongTermArchivingHFileCleaner.class);
private TableHFileArchiveTracker archiveTracker;
TableHFileArchiveTracker archiveTracker;
private FileSystem fs;
@Override

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@ -316,7 +317,7 @@ public class HFileSystem extends FilterFileSystem {
public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src)
throws IOException {
ServerName sn = HLog.getServerNameFromHLogDirectoryName(conf, src);
ServerName sn = HLogUtil.getServerNameFromHLogDirectoryName(conf, src);
if (sn == null) {
// It's not an HLOG
return;

View File

@ -33,7 +33,9 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
@ -143,7 +145,8 @@ public class HLogInputFormat extends InputFormat<HLogKey, WALEdit> {
Configuration conf = context.getConfiguration();
LOG.info("Opening reader for "+split);
try {
this.reader = HLog.getReader(logFile.getFileSystem(conf), logFile, conf);
this.reader = HLogFactory.createReader(logFile.getFileSystem(conf),
logFile, conf);
} catch (EOFException x) {
LOG.info("Ignoring corrupted HLog file: " + logFile
+ " (This is normal when a RegionServer crashed.)");

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
@ -82,7 +83,7 @@ public class WALPlayer extends Configured implements Tool {
// skip all other tables
if (Bytes.equals(table, key.getTablename())) {
for (KeyValue kv : value.getKeyValues()) {
if (HLog.isMetaFamily(kv.getFamily())) continue;
if (HLogUtil.isMetaFamily(kv.getFamily())) continue;
context.write(new ImmutableBytesWritable(kv.getRow()), kv);
}
}
@ -126,7 +127,7 @@ public class WALPlayer extends Configured implements Tool {
KeyValue lastKV = null;
for (KeyValue kv : value.getKeyValues()) {
// filtering HLog meta entries, see HLog.completeCacheFlushLogEdit
if (HLog.isMetaFamily(kv.getFamily())) continue;
if (HLogUtil.isMetaFamily(kv.getFamily())) continue;
// A WALEdit may contain multiple operations (HBASE-3584) and/or
// multiple rows (HBASE-5229).

View File

@ -47,7 +47,9 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -262,7 +264,7 @@ public class MasterFileSystem {
long splitTime = 0, splitLogSize = 0;
List<Path> logDirs = new ArrayList<Path>();
for (ServerName serverName: serverNames) {
Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName.toString()));
Path logDir = new Path(this.rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
Path splitDir = logDir.suffix(HLog.SPLITTING_EXT);
// Rename the directory so a rogue RS doesn't create more HLogs
if (fs.exists(logDir)) {

View File

@ -43,7 +43,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Chore
private final FileSystem fs;
private final Path oldFileDir;
private final Configuration conf;
private List<T> cleanersChain;
protected List<T> cleanersChain;
/**
* @param name name of the chore being run

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.master.cleaner;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -52,4 +54,11 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
}
return StoreFile.validateStoreFileName(file.getName());
}
/**
* Exposed for TESTING!
*/
public List<BaseHFileCleanerDelegate> getDelegatesForTesting() {
return this.cleanersChain;
}
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
/**
* This Chore, every time it runs, will attempt to delete the HLogs in the old logs folder. The HLog
@ -51,6 +52,6 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
@Override
protected boolean validate(Path file) {
return HLog.validateHLogFilename(file.getName());
return HLogUtil.validateHLogFilename(file.getName());
}
}

View File

@ -116,7 +116,6 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsResponse;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;

View File

@ -122,7 +122,9 @@ import org.apache.hadoop.hbase.regionserver.metrics.OperationMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
@ -2739,7 +2741,8 @@ public class HRegion implements HeapSize { // , Writable{
}
}
long seqid = minSeqIdForTheRegion;
NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
if (files == null || files.isEmpty()) return seqid;
for (Path edits: files) {
@ -2774,7 +2777,7 @@ public class HRegion implements HeapSize { // , Writable{
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
}
if (skipErrors) {
Path p = HLog.moveAsideBadEditsFile(fs, edits);
Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
+ "=true so continuing. Renamed " + edits +
" as " + p, e);
@ -2822,7 +2825,7 @@ public class HRegion implements HeapSize { // , Writable{
status.setStatus("Opening logs");
HLog.Reader reader = null;
try {
reader = HLog.getReader(this.fs, edits, conf);
reader = HLogFactory.createReader(this.fs, edits, conf);
long currentEditSeqId = -1;
long firstSeqIdInLog = -1;
long skippedEdits = 0;
@ -2922,7 +2925,7 @@ public class HRegion implements HeapSize { // , Writable{
}
}
} catch (EOFException eof) {
Path p = HLog.moveAsideBadEditsFile(fs, edits);
Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
msg = "Encountered EOF. Most likely due to Master failure during " +
"log spliting, so we have this data in another edit. " +
"Continuing, but renaming " + edits + " as " + p;
@ -2932,7 +2935,7 @@ public class HRegion implements HeapSize { // , Writable{
// If the IOE resulted from bad file format,
// then this problem is idempotent and retrying won't help
if (ioe.getCause() instanceof ParseException) {
Path p = HLog.moveAsideBadEditsFile(fs, edits);
Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
msg = "File corruption encountered! " +
"Continuing, but renaming " + edits + " as " + p;
LOG.warn(msg, ioe);
@ -3734,8 +3737,8 @@ public class HRegion implements HeapSize { // , Writable{
fs.mkdirs(regionDir);
HLog effectiveHLog = hlog;
if (hlog == null && !ignoreHLog) {
effectiveHLog = new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME),
new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf);
effectiveHLog = HLogFactory.createHLog(fs, regionDir,
HConstants.HREGION_LOGDIR_NAME, conf);
}
HRegion region = HRegion.newHRegion(tableDir,
effectiveHLog, fs, conf, info, hTableDescriptor, null);
@ -5455,12 +5458,11 @@ public class HRegion implements HeapSize { // , Writable{
final Path tableDir = new Path(args[0]);
final Configuration c = HBaseConfiguration.create();
final FileSystem fs = FileSystem.get(c);
final Path logdir = new Path(c.get("hbase.tmp.dir"),
"hlog" + tableDir.getName()
+ EnvironmentEdgeManager.currentTimeMillis());
final Path oldLogDir = new Path(c.get("hbase.tmp.dir"),
HConstants.HREGION_OLDLOGDIR_NAME);
final HLog log = new HLog(fs, logdir, oldLogDir, c);
final Path logdir = new Path(c.get("hbase.tmp.dir"));
final String logname = "hlog" + tableDir.getName()
+ EnvironmentEdgeManager.currentTimeMillis();
final HLog log = HLogFactory.createHLog(fs, logdir, logname, c);
try {
processTable(fs, tableDir, log, c, majorCompact);
} finally {

View File

@ -196,6 +196,8 @@ import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.StoreMetricType;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
@ -1379,8 +1381,10 @@ public class HRegionServer implements ClientProtocol,
*/
private HLog setupWALAndReplication() throws IOException {
final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path logdir = new Path(rootDir,
HLog.getHLogDirectoryName(this.serverNameFromMasterPOV.toString()));
final String logName
= HLogUtil.getHLogDirectoryName(this.serverNameFromMasterPOV.toString());
Path logdir = new Path(rootDir, logName);
if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
if (this.fs.exists(logdir)) {
throw new RegionServerRunningException("Region server has already " +
@ -1390,7 +1394,8 @@ public class HRegionServer implements ClientProtocol,
// Instantiate replication manager if replication enabled. Pass it the
// log directories.
createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
return instantiateHLog(logdir, oldLogDir);
return instantiateHLog(rootDir, logName);
}
/**
@ -1400,8 +1405,8 @@ public class HRegionServer implements ClientProtocol,
* @return WAL instance.
* @throws IOException
*/
protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
return new HLog(this.fs.getBackingFs(), logdir, oldLogDir, this.conf,
protected HLog instantiateHLog(Path rootdir, String logName) throws IOException {
return HLogFactory.createHLog(this.fs.getBackingFs(), rootdir, logName, this.conf,
getWALActionListeners(), this.serverNameFromMasterPOV.toString());
}
@ -1409,7 +1414,7 @@ public class HRegionServer implements ClientProtocol,
* Called by {@link #instantiateHLog(Path, Path)} setting up WAL instance.
* Add any {@link WALActionsListener}s you want inserted before WAL startup.
* @return List of WALActionsListener that will be passed in to
* {@link HLog} on construction.
* {@link FSHLog} on construction.
*/
protected List<WALActionsListener> getWALActionListeners() {
List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();

View File

@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.metrics.MetricsRate;
import org.apache.hadoop.hbase.metrics.histogram.MetricsHistogram;
import org.apache.hadoop.hbase.metrics.PersistentMetricsTimeVaryingRate;
import com.yammer.metrics.stats.Snapshot;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogMetrics;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.metrics.ContextFactory;
@ -399,11 +399,11 @@ public class RegionServerMetrics implements Updater {
// }
// Means you can't pass a numOps of zero or get a ArithmeticException / by zero.
// HLog metrics
addHLogMetric(HLog.getWriteTime(), this.fsWriteLatency);
addHLogMetric(HLog.getWriteSize(), this.fsWriteSize);
addHLogMetric(HLog.getSyncTime(), this.fsSyncLatency);
addHLogMetric(HLog.getSlowAppendTime(), this.slowHLogAppendTime);
this.slowHLogAppendCount.set(HLog.getSlowAppendCount());
addHLogMetric(HLogMetrics.getWriteTime(), this.fsWriteLatency);
addHLogMetric(HLogMetrics.getWriteSize(), this.fsWriteSize);
addHLogMetric(HLogMetrics.getSyncTime(), this.fsSyncLatency);
addHLogMetric(HLogMetrics.getSlowAppendTime(), this.slowHLogAppendTime);
this.slowHLogAppendCount.set(HLogMetrics.getSlowAppendCount());
// HFile metrics, sequential reads
int ops = HFile.getReadOps();
if (ops != 0) this.fsReadLatency.inc(ops, HFile.getReadTimeMs());
@ -453,7 +453,7 @@ public class RegionServerMetrics implements Updater {
this.metricsRecord.update();
}
private void addHLogMetric(HLog.Metric logMetric,
private void addHLogMetric(HLogMetrics.Metric logMetric,
MetricsTimeVaryingRate hadoopMetric) {
if (logMetric.count > 0)
hadoopMetric.inc(logMetric.min);

View File

@ -25,7 +25,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.conf.Configuration;
@ -78,7 +78,7 @@ public class Compressor {
conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress);
out.init(outFS, output, conf);
Entry e = null;
HLog.Entry e = null;
while ((e = in.next()) != null) out.append(e);
} finally {
in.close();

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,119 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
public class HLogFactory {
private static final Log LOG = LogFactory.getLog(HLogFactory.class);
public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
final Configuration conf) throws IOException {
return new FSHLog(fs, root, logName, conf);
}
public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
final String oldLogName, final Configuration conf) throws IOException {
return new FSHLog(fs, root, logName, oldLogName, conf);
}
public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
final Configuration conf, final List<WALActionsListener> listeners,
final String prefix) throws IOException {
return new FSHLog(fs, root, logName, conf, listeners, prefix);
}
/*
* WAL Reader
*/
private static Class<? extends Reader> logReaderClass;
static void resetLogReaderClass() {
logReaderClass = null;
}
/**
* Create a reader for the WAL.
* @return A WAL reader. Close when done with it.
* @throws IOException
*/
public static HLog.Reader createReader(final FileSystem fs,
final Path path, Configuration conf)
throws IOException {
try {
if (logReaderClass == null) {
logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
SequenceFileLogReader.class, Reader.class);
}
HLog.Reader reader = logReaderClass.newInstance();
reader.init(fs, path, conf);
return reader;
} catch (IOException e) {
throw e;
}
catch (Exception e) {
throw new IOException("Cannot get log reader", e);
}
}
/*
* WAL writer
*/
private static Class<? extends Writer> logWriterClass;
/**
* Create a writer for the WAL.
* @return A WAL writer. Close when done with it.
* @throws IOException
*/
public static HLog.Writer createWriter(final FileSystem fs,
final Path path, Configuration conf)
throws IOException {
try {
if (logWriterClass == null) {
logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
SequenceFileLogWriter.class, Writer.class);
}
HLog.Writer writer = (HLog.Writer) logWriterClass.newInstance();
writer.init(fs, path, conf);
return writer;
} catch (Exception e) {
throw new IOException("cannot get log writer", e);
}
}
}

View File

@ -0,0 +1,83 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.util.concurrent.atomic.AtomicLong;
public class HLogMetrics {
public static class Metric {
public long min = Long.MAX_VALUE;
public long max = 0;
public long total = 0;
public int count = 0;
synchronized void inc(final long val) {
min = Math.min(min, val);
max = Math.max(max, val);
total += val;
++count;
}
synchronized Metric get() {
Metric copy = new Metric();
copy.min = min;
copy.max = max;
copy.total = total;
copy.count = count;
this.min = Long.MAX_VALUE;
this.max = 0;
this.total = 0;
this.count = 0;
return copy;
}
}
// For measuring latency of writes
static Metric writeTime = new Metric();
static Metric writeSize = new Metric();
// For measuring latency of syncs
static Metric syncTime = new Metric();
//For measuring slow HLog appends
static AtomicLong slowHLogAppendCount = new AtomicLong();
static Metric slowHLogAppendTime = new Metric();
public static Metric getWriteTime() {
return writeTime.get();
}
public static Metric getWriteSize() {
return writeSize.get();
}
public static Metric getSyncTime() {
return syncTime.get();
}
public static long getSlowAppendCount() {
return slowHLogAppendCount.get();
}
public static Metric getSlowAppendTime() {
return slowHLogAppendTime.get();
}
}

View File

@ -240,9 +240,9 @@ public class HLogPrettyPrinter {
out.print("[");
firstTxn = true;
}
Reader log = HLog.getReader(fs, p, conf);
Reader log = HLogFactory.createReader(fs, p, conf);
try {
HLog.Entry entry;
FSHLog.Entry entry;
while ((entry = log.next()) != null) {
HLogKey key = entry.getKey();
WALEdit edit = entry.getEdit();

View File

@ -563,7 +563,7 @@ public class HLogSplitter {
}
for (Path p : processedLogs) {
Path newPath = HLog.getHLogArchivePath(oldLogDir, p);
Path newPath = FSHLog.getHLogArchivePath(oldLogDir, p);
if (fs.exists(p)) {
if (!fs.rename(p, newPath)) {
LOG.warn("Unable to move " + p + " to " + newPath);
@ -598,7 +598,7 @@ public class HLogSplitter {
Path tableDir = HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename());
Path regiondir = HRegion.getRegionDir(tableDir,
Bytes.toString(logEntry.getKey().getEncodedRegionName()));
Path dir = HLog.getRegionDirRecoveredEditsDir(regiondir);
Path dir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
if (!fs.exists(regiondir)) {
LOG.info("This region's directory doesn't exist: "
@ -777,7 +777,7 @@ public class HLogSplitter {
*/
protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
throws IOException {
return HLog.createWriter(fs, logfile, conf);
return HLogFactory.createWriter(fs, logfile, conf);
}
/**
@ -785,7 +785,7 @@ public class HLogSplitter {
*/
protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
throws IOException {
return HLog.getReader(fs, curLogFile, conf);
return HLogFactory.createReader(fs, curLogFile, conf);
}
/**

View File

@ -0,0 +1,324 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
public class HLogUtil {
static final Log LOG = LogFactory.getLog(HLogUtil.class);
static byte[] COMPLETE_CACHE_FLUSH;
static {
try {
COMPLETE_CACHE_FLUSH = "HBASE::CACHEFLUSH"
.getBytes(HConstants.UTF8_ENCODING);
} catch (UnsupportedEncodingException e) {
assert (false);
}
}
/**
* @param family
* @return true if the column is a meta column
*/
public static boolean isMetaFamily(byte[] family) {
return Bytes.equals(HLog.METAFAMILY, family);
}
@SuppressWarnings("unchecked")
public static Class<? extends HLogKey> getKeyClass(Configuration conf) {
return (Class<? extends HLogKey>) conf.getClass(
"hbase.regionserver.hlog.keyclass", HLogKey.class);
}
public static HLogKey newKey(Configuration conf) throws IOException {
Class<? extends HLogKey> keyClass = getKeyClass(conf);
try {
return keyClass.newInstance();
} catch (InstantiationException e) {
throw new IOException("cannot create hlog key");
} catch (IllegalAccessException e) {
throw new IOException("cannot create hlog key");
}
}
/**
* Pattern used to validate a HLog file name
*/
private static final Pattern pattern = Pattern.compile(".*\\.\\d*");
/**
* @param filename
* name of the file to validate
* @return <tt>true</tt> if the filename matches an HLog, <tt>false</tt>
* otherwise
*/
public static boolean validateHLogFilename(String filename) {
return pattern.matcher(filename).matches();
}
/*
* Get a reader for the WAL.
*
* @param fs
*
* @param path
*
* @param conf
*
* @return A WAL reader. Close when done with it.
*
* @throws IOException
*
* public static HLog.Reader getReader(final FileSystem fs, final Path path,
* Configuration conf) throws IOException { try {
*
* if (logReaderClass == null) {
*
* logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
* SequenceFileLogReader.class, Reader.class); }
*
*
* HLog.Reader reader = logReaderClass.newInstance(); reader.init(fs, path,
* conf); return reader; } catch (IOException e) { throw e; } catch (Exception
* e) { throw new IOException("Cannot get log reader", e); } }
*
* * Get a writer for the WAL.
*
* @param path
*
* @param conf
*
* @return A WAL writer. Close when done with it.
*
* @throws IOException
*
* public static HLog.Writer createWriter(final FileSystem fs, final Path
* path, Configuration conf) throws IOException { try { if (logWriterClass ==
* null) { logWriterClass =
* conf.getClass("hbase.regionserver.hlog.writer.impl",
* SequenceFileLogWriter.class, Writer.class); } FSHLog.Writer writer =
* (FSHLog.Writer) logWriterClass.newInstance(); writer.init(fs, path, conf);
* return writer; } catch (Exception e) { throw new
* IOException("cannot get log writer", e); } }
*/
/**
* Construct the HLog directory name
*
* @param serverName
* Server name formatted as described in {@link ServerName}
* @return the relative HLog directory name, e.g.
* <code>.logs/1.example.org,60030,12345</code> if
* <code>serverName</code> passed is
* <code>1.example.org,60030,12345</code>
*/
public static String getHLogDirectoryName(final String serverName) {
StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
dirName.append("/");
dirName.append(serverName);
return dirName.toString();
}
/**
* @param regiondir
* This regions directory in the filesystem.
* @return The directory that holds recovered edits files for the region
* <code>regiondir</code>
*/
public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
return new Path(regiondir, HLog.RECOVERED_EDITS_DIR);
}
/**
* Move aside a bad edits file.
*
* @param fs
* @param edits
* Edits file to move aside.
* @return The name of the moved aside file.
* @throws IOException
*/
public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
throws IOException {
Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
+ System.currentTimeMillis());
if (!fs.rename(edits, moveAsideName)) {
LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
}
return moveAsideName;
}
/**
* @param path
* - the path to analyze. Expected format, if it's in hlog directory:
* / [base directory for hbase] / hbase / .logs / ServerName /
* logfile
* @return null if it's not a log file. Returns the ServerName of the region
* server that created this log file otherwise.
*/
public static ServerName getServerNameFromHLogDirectoryName(
Configuration conf, String path) throws IOException {
if (path == null
|| path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) {
return null;
}
if (conf == null) {
throw new IllegalArgumentException("parameter conf must be set");
}
final String rootDir = conf.get(HConstants.HBASE_DIR);
if (rootDir == null || rootDir.isEmpty()) {
throw new IllegalArgumentException(HConstants.HBASE_DIR
+ " key not found in conf.");
}
final StringBuilder startPathSB = new StringBuilder(rootDir);
if (!rootDir.endsWith("/"))
startPathSB.append('/');
startPathSB.append(HConstants.HREGION_LOGDIR_NAME);
if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/"))
startPathSB.append('/');
final String startPath = startPathSB.toString();
String fullPath;
try {
fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString();
} catch (IllegalArgumentException e) {
LOG.info("Call to makeQualified failed on " + path + " " + e.getMessage());
return null;
}
if (!fullPath.startsWith(startPath)) {
return null;
}
final String serverNameAndFile = fullPath.substring(startPath.length());
if (serverNameAndFile.indexOf('/') < "a,0,0".length()) {
// Either it's a file (not a directory) or it's not a ServerName format
return null;
}
final String serverName = serverNameAndFile.substring(0,
serverNameAndFile.indexOf('/') - 1);
if (!ServerName.isFullServerName(serverName)) {
return null;
}
return ServerName.parseServerName(serverName);
}
/**
* Return regions (memstores) that have edits that are equal or less than the
* passed <code>oldestWALseqid</code>.
*
* @param oldestWALseqid
* @param regionsToSeqids
* Encoded region names to sequence ids
* @return All regions whose seqid is < than <code>oldestWALseqid</code> (Not
* necessarily in order). Null if no regions found.
*/
static byte[][] findMemstoresWithEditsEqualOrOlderThan(
final long oldestWALseqid, final Map<byte[], Long> regionsToSeqids) {
// This method is static so it can be unit tested the easier.
List<byte[]> regions = null;
for (Map.Entry<byte[], Long> e : regionsToSeqids.entrySet()) {
if (e.getValue().longValue() <= oldestWALseqid) {
if (regions == null)
regions = new ArrayList<byte[]>();
// Key is encoded region name.
regions.add(e.getKey());
}
}
return regions == null ? null : regions
.toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
}
/**
* Returns sorted set of edit files made by wal-log splitter, excluding files
* with '.temp' suffix.
*
* @param fs
* @param regiondir
* @return Files in passed <code>regiondir</code> as a sorted set.
* @throws IOException
*/
public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
final Path regiondir) throws IOException {
NavigableSet<Path> filesSorted = new TreeSet<Path>();
Path editsdir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
if (!fs.exists(editsdir))
return filesSorted;
FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
@Override
public boolean accept(Path p) {
boolean result = false;
try {
// Return files and only files that match the editfile names pattern.
// There can be other files in this directory other than edit files.
// In particular, on error, we'll move aside the bad edit file giving
// it a timestamp suffix. See moveAsideBadEditsFile.
Matcher m = HLog.EDITFILES_NAME_PATTERN.matcher(p.getName());
result = fs.isFile(p) && m.matches();
// Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
// because it means splithlog thread is writting this file.
if (p.getName().endsWith(HLog.RECOVERED_LOG_TMPFILE_SUFFIX)) {
result = false;
}
} catch (IOException e) {
LOG.warn("Failed isFile check on " + p);
}
return result;
}
});
if (files == null)
return filesSorted;
for (FileStatus status : files) {
filesSorted.add(status.getPath());
}
return filesSorted;
}
}

View File

@ -214,7 +214,7 @@ public class SequenceFileLogReader implements HLog.Reader {
if (e == null) {
HLogKey key;
if (keyClass == null) {
key = HLog.newKey(conf);
key = HLogUtil.newKey(conf);
} else {
try {
key = keyClass.newInstance();

View File

@ -40,7 +40,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
/**
* Implementation of {@link HLog.Writer} that delegates to
* Implementation of {@link FSHLog.Writer} that delegates to
* SequenceFile.Writer.
*/
@InterfaceAudience.Private
@ -140,7 +140,7 @@ public class SequenceFileLogWriter implements HLog.Writer {
}
if (null == keyClass) {
keyClass = HLog.getKeyClass(conf);
keyClass = HLogUtil.getKeyClass(conf);
}
// Create a SF.Writer instance.
@ -152,7 +152,7 @@ public class SequenceFileLogWriter implements HLog.Writer {
Configuration.class, Path.class, Class.class, Class.class,
Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE,
CompressionType.class, CompressionCodec.class, Metadata.class})
.invoke(null, new Object[] {fs, conf, path, HLog.getKeyClass(conf),
.invoke(null, new Object[] {fs, conf, path, HLogUtil.getKeyClass(conf),
WALEdit.class,
Integer.valueOf(fs.getConf().getInt("io.file.buffer.size", 4096)),
Short.valueOf((short)
@ -175,7 +175,7 @@ public class SequenceFileLogWriter implements HLog.Writer {
if (this.writer == null) {
LOG.debug("new createWriter -- HADOOP-6840 -- not available");
this.writer = SequenceFile.createWriter(fs, conf, path,
HLog.getKeyClass(conf), WALEdit.class,
HLogUtil.getKeyClass(conf), WALEdit.class,
fs.getConf().getInt("io.file.buffer.size", 4096),
(short) conf.getInt("hbase.regionserver.hlog.replication",
fs.getDefaultReplication()),

View File

@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
/**
* Get notification of {@link HLog}/WAL log events. The invocations are inline
* Get notification of {@link FSHLog}/WAL log events. The invocations are inline
* so make sure your implementation is fast else you'll slow hbase.
*/
@InterfaceAudience.Private

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.conf.Configuration;
/**
* Implements the coprocessor environment and runtime support for coprocessors
* loaded within a {@link HLog}.
* loaded within a {@link FSHLog}.
*/
@InterfaceAudience.Private
public class WALCoprocessorHost
@ -42,10 +42,10 @@ public class WALCoprocessorHost
static class WALEnvironment extends CoprocessorHost.Environment
implements WALCoprocessorEnvironment {
private HLog wal;
private FSHLog wal;
@Override
public HLog getWAL() {
public FSHLog getWAL() {
return wal;
}
@ -59,19 +59,19 @@ public class WALCoprocessorHost
*/
public WALEnvironment(Class<?> implClass, final Coprocessor impl,
final int priority, final int seq, final Configuration conf,
final HLog hlog) {
final FSHLog hlog) {
super(impl, priority, seq, conf);
this.wal = hlog;
}
}
HLog wal;
FSHLog wal;
/**
* Constructor
* @param log the write ahead log
* @param conf the configuration
*/
public WALCoprocessorHost(final HLog log, final Configuration conf) {
public WALCoprocessorHost(final FSHLog log, final Configuration conf) {
this.wal = log;
// load system default cp's from configuration.
loadSystemCoprocessors(conf, WAL_COPROCESSOR_CONF_KEY);

View File

@ -52,10 +52,13 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.replication.regionserver.metrics.ReplicationSourceMetrics;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@ -393,7 +396,7 @@ public class ReplicationSource extends Thread
this.reader.seek(this.position);
}
long startPosition = this.position;
HLog.Entry entry = readNextAndSetPosition();
HLog.Entry entry = readNextAndSetPosition();
while (entry != null) {
WALEdit edit = entry.getEdit();
this.metrics.incrLogEditsRead();
@ -493,7 +496,8 @@ public class ReplicationSource extends Thread
" at " + this.position);
try {
this.reader = null;
this.reader = HLog.getReader(this.fs, this.currentPath, this.conf);
this.reader = HLogFactory.createReader(this.fs,
this.currentPath, this.conf);
} catch (FileNotFoundException fnfe) {
if (this.queueRecovered) {
// We didn't find the log in the archive directory, look if it still

View File

@ -84,8 +84,9 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler;
@ -2999,7 +3000,7 @@ public class HBaseFsck {
// This is special case if a region is left after split
he.hdfsOnlyEdits = true;
FileStatus[] subDirs = fs.listStatus(regionDir.getPath());
Path ePath = HLog.getRegionDirRecoveredEditsDir(regionDir.getPath());
Path ePath = HLogUtil.getRegionDirRecoveredEditsDir(regionDir.getPath());
for (FileStatus subDir : subDirs) {
String sdName = subDir.getPath().getName();
if (!sdName.startsWith(".") && !sdName.equals(ePath.getName())) {

View File

@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.regionserver.HStore;
* Helper class for all utilities related to archival/retrieval of HFiles
*/
public class HFileArchiveUtil {
static final String DEFAULT_HFILE_ARCHIVE_DIRECTORY = ".archive";
public static final String DEFAULT_HFILE_ARCHIVE_DIRECTORY = ".archive";
private HFileArchiveUtil() {
// non-external instantiation - util class

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
/**
* A non-instantiable class that has a static method capable of compacting
@ -155,10 +156,9 @@ class HMerge {
Bytes.toString(tableName)
);
this.htd = FSTableDescriptors.getTableDescriptor(this.fs, this.tabledir);
Path logdir = new Path(tabledir, "merge_" + System.currentTimeMillis() +
HConstants.HREGION_LOGDIR_NAME);
Path oldLogDir = new Path(tabledir, HConstants.HREGION_OLDLOGDIR_NAME);
this.hlog = new HLog(fs, logdir, oldLogDir, conf);
String logname = "merge_" + System.currentTimeMillis() + HConstants.HREGION_LOGDIR_NAME;
this.hlog = HLogFactory.createHLog(fs, tabledir, logname, conf);
}
void process() throws IOException {

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
/**
* Contains utility methods for manipulating HBase meta tables.
@ -97,11 +98,10 @@ public class MetaUtils {
*/
public synchronized HLog getLog() throws IOException {
if (this.log == null) {
Path logdir = new Path(this.fs.getHomeDirectory(),
HConstants.HREGION_LOGDIR_NAME + "_" + System.currentTimeMillis());
Path oldLogDir = new Path(this.fs.getHomeDirectory(),
HConstants.HREGION_OLDLOGDIR_NAME);
this.log = new HLog(this.fs, logdir, oldLogDir, this.conf);
String logName =
HConstants.HREGION_LOGDIR_NAME + "_" + System.currentTimeMillis();
this.log = HLogFactory.createHLog(this.fs, this.fs.getHomeDirectory(),
logName, this.conf);
}
return this.log;
}

View File

@ -19,45 +19,49 @@ package org.apache.hadoop.hbase.backup.example;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.regionserver.CheckedArchivingHFileCleaner;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.StoppableImplementation;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Spin up a small cluster and check that the hfiles of region are properly long-term archived as
* specified via the {@link ZKTableArchiveClient}.
*/
@Category(LargeTests.class)
@Category(MediumTests.class)
public class TestZooKeeperTableArchiveClient {
private static final Log LOG = LogFactory.getLog(TestZooKeeperTableArchiveClient.class);
@ -65,10 +69,8 @@ public class TestZooKeeperTableArchiveClient {
private static final String STRING_TABLE_NAME = "test";
private static final byte[] TEST_FAM = Bytes.toBytes("fam");
private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
private static final int numRS = 2;
private static final int maxTries = 5;
private static final long ttl = 1000;
private static ZKTableArchiveClient archivingClient;
private final List<Path> toCleanup = new ArrayList<Path>();
/**
* Setup the config for the cluster
@ -76,44 +78,35 @@ public class TestZooKeeperTableArchiveClient {
@BeforeClass
public static void setupCluster() throws Exception {
setupConf(UTIL.getConfiguration());
UTIL.startMiniCluster(numRS);
UTIL.startMiniZKCluster();
archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), UTIL.getHBaseAdmin()
.getConnection());
// make hfile archiving node so we can archive files
ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
ZKUtil.createWithParents(watcher, archivingZNode);
}
private static void setupConf(Configuration conf) {
// disable the ui
conf.setInt("hbase.regionsever.info.port", -1);
// change the flush size to a small amount, regulating number of store files
conf.setInt("hbase.hregion.memstore.flush.size", 25000);
// so make sure we get a compaction when doing a load, but keep around some
// files in the store
conf.setInt("hbase.hstore.compaction.min", 10);
conf.setInt("hbase.hstore.compactionThreshold", 10);
// block writes if we get to 12 store files
conf.setInt("hbase.hstore.blockingStoreFiles", 12);
// only compact with 3 files
conf.setInt("hbase.hstore.compaction.min", 3);
// drop the number of attempts for the hbase admin
conf.setInt("hbase.client.retries.number", 1);
// set the ttl on the hfiles
conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl);
conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
CheckedArchivingHFileCleaner.class.getCanonicalName(),
LongTermArchivingHFileCleaner.class.getCanonicalName());
}
@Before
public void setup() throws Exception {
UTIL.createTable(TABLE_NAME, TEST_FAM);
}
@After
public void tearDown() throws Exception {
UTIL.deleteTable(TABLE_NAME);
// and cleanup the archive directory
try {
UTIL.getTestFileSystem().delete(new Path(UTIL.getDefaultRootDirPath(), ".archive"), true);
FileSystem fs = UTIL.getTestFileSystem();
// cleanup each of the files/directories registered
for (Path file : toCleanup) {
// remove the table and archive directories
FSUtils.delete(fs, file, true);
}
} catch (IOException e) {
LOG.warn("Failure to delete archive directory", e);
} finally {
toCleanup.clear();
}
// make sure that backups are off for all tables
archivingClient.disableHFileBackup();
@ -122,7 +115,7 @@ public class TestZooKeeperTableArchiveClient {
@AfterClass
public static void cleanupTest() throws Exception {
try {
UTIL.shutdownMiniCluster();
UTIL.shutdownMiniZKCluster();
} catch (Exception e) {
LOG.warn("problem shutting down cluster", e);
}
@ -156,227 +149,263 @@ public class TestZooKeeperTableArchiveClient {
@Test
public void testArchivingOnSingleTable() throws Exception {
// turn on hfile retention
LOG.debug("----Starting archiving");
archivingClient.enableHFileBackupAsync(TABLE_NAME);
assertTrue("Archving didn't get turned on", archivingClient
.getArchivingEnabled(TABLE_NAME));
createArchiveDirectory();
FileSystem fs = UTIL.getTestFileSystem();
Path archiveDir = getArchiveDir();
Path tableDir = getTableDir(STRING_TABLE_NAME);
toCleanup.add(archiveDir);
toCleanup.add(tableDir);
// get the RS and region serving our table
List<HRegion> servingRegions = UTIL.getHBaseCluster().getRegions(TABLE_NAME);
// make sure we only have 1 region serving this table
assertEquals(1, servingRegions.size());
HRegion region = servingRegions.get(0);
// get the parent RS and monitor
HRegionServer hrs = UTIL.getRSForFirstRegionInTable(TABLE_NAME);
FileSystem fs = hrs.getFileSystem();
// put some data on the region
LOG.debug("-------Loading table");
UTIL.loadRegion(region, TEST_FAM);
loadAndCompact(region);
// check that we actually have some store files that were archived
Store store = region.getStore(TEST_FAM);
Path storeArchiveDir = HFileArchiveTestingUtil.getStoreArchivePath(UTIL.getConfiguration(),
region, store);
// check to make sure we archived some files
assertTrue("Didn't create a store archive directory", fs.exists(storeArchiveDir));
assertTrue("No files in the store archive",
FSUtils.listStatus(fs, storeArchiveDir, null).length > 0);
// and then put some non-tables files in the archive
Configuration conf = UTIL.getConfiguration();
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
// write a tmp file to the archive dir
Path tmpFile = new Path(archiveDir, "toDelete");
FSDataOutputStream out = fs.create(tmpFile);
out.write(1);
out.close();
// setup the delegate
Stoppable stop = new StoppableImplementation();
HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
assertTrue(fs.exists(tmpFile));
// make sure we wait long enough for the files to expire
Thread.sleep(ttl);
// create the region
HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
// print currrent state for comparison
FSUtils.logFileSystemState(fs, archiveDir, LOG);
loadFlushAndCompact(region, TEST_FAM);
// ensure there are no archived files after waiting for a timeout
ensureHFileCleanersRun();
// get the current hfiles in the archive directory
List<Path> files = getAllFiles(fs, archiveDir);
if (files == null) {
FSUtils.logFileSystemState(fs, UTIL.getDataTestDir(), LOG);
throw new RuntimeException("Didn't archive any files!");
}
CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size());
// check to make sure the right things get deleted
assertTrue("Store archive got deleted", fs.exists(storeArchiveDir));
assertTrue("Archived HFiles got deleted",
FSUtils.listStatus(fs, storeArchiveDir, null).length > 0);
runCleaner(cleaner, finished, stop);
// know the cleaner ran, so now check all the files again to make sure they are still there
List<Path> archivedFiles = getAllFiles(fs, archiveDir);
assertEquals("Archived files changed after running archive cleaner.", files, archivedFiles);
assertFalse(
"Tmp file (non-table archive file) didn't " + "get deleted, archive dir: "
+ fs.listStatus(archiveDir), fs.exists(tmpFile));
LOG.debug("Turning off hfile backup.");
// stop archiving the table
archivingClient.disableHFileBackup();
LOG.debug("Deleting table from archive.");
// now remove the archived table
Path primaryTable = new Path(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration()),
STRING_TABLE_NAME);
fs.delete(primaryTable, true);
LOG.debug("Deleted primary table, waiting for file cleaners to run");
// and make sure the archive directory is retained after a cleanup
// have to do this manually since delegates aren't run if there isn't any files in the archive
// dir to cleanup
Thread.sleep(ttl);
UTIL.getHBaseCluster().getMaster().getHFileCleaner().triggerNow();
Thread.sleep(ttl);
LOG.debug("File cleaners done, checking results.");
// but we still have the archive directory
assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration())));
}
/**
* Make sure all the {@link HFileCleaner} run.
* <p>
* Blocking operation up to 3x ttl
* @throws InterruptedException
*/
private void ensureHFileCleanersRun() throws InterruptedException {
LOG.debug("Waiting on archive cleaners to run...");
CheckedArchivingHFileCleaner.resetCheck();
do {
UTIL.getHBaseCluster().getMaster().getHFileCleaner().triggerNow();
LOG.debug("Triggered, sleeping an amount until we can pass the check.");
Thread.sleep(ttl);
} while (!CheckedArchivingHFileCleaner.getChecked());
}
/**
* Test archiving/cleaning across multiple tables, where some are retained, and others aren't
* @throws Exception
* @throws Exception on failure
*/
@Test
public void testMultipleTables() throws Exception {
archivingClient.enableHFileBackupAsync(TABLE_NAME);
assertTrue("Archving didn't get turned on", archivingClient
.getArchivingEnabled(TABLE_NAME));
createArchiveDirectory();
String otherTable = "otherTable";
FileSystem fs = UTIL.getTestFileSystem();
Path archiveDir = getArchiveDir();
Path tableDir = getTableDir(STRING_TABLE_NAME);
Path otherTableDir = getTableDir(otherTable);
// register cleanup for the created directories
toCleanup.add(archiveDir);
toCleanup.add(tableDir);
toCleanup.add(otherTableDir);
Configuration conf = UTIL.getConfiguration();
// setup the delegate
Stoppable stop = new StoppableImplementation();
HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
// create the region
HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
loadFlushAndCompact(region, TEST_FAM);
// create the another table that we don't archive
String otherTable = "otherTable";
UTIL.createTable(Bytes.toBytes(otherTable), TEST_FAM);
hcd = new HColumnDescriptor(TEST_FAM);
HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd);
loadFlushAndCompact(otherRegion, TEST_FAM);
// get the parent RS and monitor
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
// get the current hfiles in the archive directory
List<Path> files = getAllFiles(fs, archiveDir);
if (files == null) {
FSUtils.logFileSystemState(fs, archiveDir, LOG);
throw new RuntimeException("Didn't load archive any files!");
}
// put data in the filesystem of the first table
LOG.debug("Loading data into:" + STRING_TABLE_NAME);
loadAndCompact(STRING_TABLE_NAME);
// make sure we have files from both tables
int initialCountForPrimary = 0;
int initialCountForOtherTable = 0;
for (Path file : files) {
String tableName = file.getParent().getParent().getParent().getName();
// check to which table this file belongs
if (tableName.equals(otherTable)) initialCountForOtherTable++;
else if (tableName.equals(STRING_TABLE_NAME)) initialCountForPrimary++;
}
// and some data in the other table
LOG.debug("Loading data into:" + otherTable);
loadAndCompact(otherTable);
assertTrue("Didn't archive files for:" + STRING_TABLE_NAME, initialCountForPrimary > 0);
assertTrue("Didn't archive files for:" + otherTable, initialCountForOtherTable > 0);
// make sure we wait long enough for the other table's files to expire
ensureHFileCleanersRun();
// run the cleaners
CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size());
// run the cleaner
cleaner.start();
// wait for the cleaner to check all the files
finished.await();
// stop the cleaner
stop.stop("");
// know the cleaner ran, so now check all the files again to make sure they are still there
List<Path> archivedFiles = getAllFiles(fs, archiveDir);
int archivedForPrimary = 0;
for(Path file: archivedFiles) {
String tableName = file.getParent().getParent().getParent().getName();
// ensure we don't have files from the non-archived table
assertFalse("Have a file from the non-archived table: " + file, tableName.equals(otherTable));
if (tableName.equals(STRING_TABLE_NAME)) archivedForPrimary++;
}
assertEquals("Not all archived files for the primary table were retained.", initialCountForPrimary,
archivedForPrimary);
// check to make sure the right things get deleted
Path primaryStoreArchive = HFileArchiveTestingUtil.getStoreArchivePath(UTIL, STRING_TABLE_NAME,
TEST_FAM);
Path otherStoreArchive = HFileArchiveTestingUtil
.getStoreArchivePath(UTIL, otherTable, TEST_FAM);
// make sure the primary store doesn't have any files
assertTrue("Store archive got deleted", fs.exists(primaryStoreArchive));
assertTrue("Archived HFiles got deleted",
FSUtils.listStatus(fs, primaryStoreArchive, null).length > 0);
FileStatus[] otherArchiveFiles = FSUtils.listStatus(fs, otherStoreArchive, null);
assertNull("Archived HFiles (" + otherStoreArchive
+ ") should have gotten deleted, but didn't, remaining files:"
+ getPaths(otherArchiveFiles), otherArchiveFiles);
// sleep again to make sure we the other table gets cleaned up
ensureHFileCleanersRun();
// first pass removes the store archive
assertFalse(fs.exists(otherStoreArchive));
// second pass removes the region
ensureHFileCleanersRun();
Path parent = otherStoreArchive.getParent();
assertFalse(fs.exists(parent));
// third pass remove the table
ensureHFileCleanersRun();
parent = otherStoreArchive.getParent();
assertFalse(fs.exists(parent));
// but we still have the archive directory
assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration())));
FSUtils.logFileSystemState(fs, HFileArchiveUtil.getArchivePath(UTIL.getConfiguration()), LOG);
UTIL.deleteTable(Bytes.toBytes(otherTable));
assertTrue("Archive directory was deleted via archiver", fs.exists(archiveDir));
}
private List<Path> getPaths(FileStatus[] files) {
if (files == null || files.length == 0) return null;
List<Path> paths = new ArrayList<Path>(files.length);
for (FileStatus file : files) {
paths.add(file.getPath());
}
return paths;
private void createArchiveDirectory() throws IOException {
//create the archive and test directory
FileSystem fs = UTIL.getTestFileSystem();
Path archiveDir = getArchiveDir();
fs.mkdirs(archiveDir);
}
private void loadAndCompact(String tableName) throws Exception {
byte[] table = Bytes.toBytes(tableName);
// get the RS and region serving our table
List<HRegion> servingRegions = UTIL.getHBaseCluster().getRegions(table);
// make sure we only have 1 region serving this table
assertEquals(1, servingRegions.size());
HRegion region = servingRegions.get(0);
private Path getArchiveDir() throws IOException {
return new Path(UTIL.getDataTestDir(), HFileArchiveUtil.DEFAULT_HFILE_ARCHIVE_DIRECTORY);
}
// get the parent RS and monitor
HRegionServer hrs = UTIL.getRSForFirstRegionInTable(table);
FileSystem fs = hrs.getFileSystem();
private Path getTableDir(String tableName) throws IOException {
Path testDataDir = UTIL.getDataTestDir();
FSUtils.setRootDir(UTIL.getConfiguration(), testDataDir);
return new Path(testDataDir, tableName);
}
// put some data on the region
LOG.debug("-------Loading table");
UTIL.loadRegion(region, TEST_FAM);
loadAndCompact(region);
// check that we actually have some store files that were archived
Store store = region.getStore(TEST_FAM);
Path storeArchiveDir = HFileArchiveTestingUtil.getStoreArchivePath(UTIL.getConfiguration(),
region, store);
// check to make sure we archived some files
assertTrue("Didn't create a store archive directory", fs.exists(storeArchiveDir));
assertTrue("No files in the store archive",
FSUtils.listStatus(fs, storeArchiveDir, null).length > 0);
// wait for the compactions to finish
region.waitForFlushesAndCompactions();
private HFileCleaner setupAndCreateCleaner(Configuration conf, FileSystem fs, Path archiveDir,
Stoppable stop) {
conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
LongTermArchivingHFileCleaner.class.getCanonicalName());
return new HFileCleaner(1000, stop, conf, fs, archiveDir);
}
/**
* Load the given region and then ensure that it compacts some files
* Start archiving table for given hfile cleaner
* @param tableName table to archive
* @param cleaner cleaner to check to make sure change propagated
* @return underlying {@link LongTermArchivingHFileCleaner} that is managing archiving
* @throws IOException on failure
* @throws KeeperException on failure
*/
private void loadAndCompact(HRegion region) throws Exception {
int tries = 0;
Exception last = null;
while (tries++ <= maxTries) {
try {
// load the region with data
UTIL.loadRegion(region, TEST_FAM);
// and then trigger a compaction to be sure we try to archive
compactRegion(region, TEST_FAM);
return;
} catch (Exception e) {
// keep this around for if we fail later
last = e;
private List<BaseHFileCleanerDelegate> turnOnArchiving(String tableName, HFileCleaner cleaner)
throws IOException, KeeperException {
// turn on hfile retention
LOG.debug("----Starting archiving for table:" + tableName);
archivingClient.enableHFileBackupAsync(Bytes.toBytes(tableName));
assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(tableName));
// wait for the archiver to get the notification
List<BaseHFileCleanerDelegate> cleaners = cleaner.getDelegatesForTesting();
LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
while (!delegate.archiveTracker.keepHFiles(STRING_TABLE_NAME)) {
// spin until propagation - should be fast
}
return cleaners;
}
/**
* Spy on the {@link LongTermArchivingHFileCleaner} to ensure we can catch when the cleaner has
* seen all the files
* @return a {@link CountDownLatch} to wait on that releases when the cleaner has been called at
* least the expected number of times.
*/
private CountDownLatch setupCleanerWatching(LongTermArchivingHFileCleaner cleaner,
List<BaseHFileCleanerDelegate> cleaners, final int expected) {
// replace the cleaner with one that we can can check
BaseHFileCleanerDelegate delegateSpy = Mockito.spy(cleaner);
final int[] counter = new int[] { 0 };
final CountDownLatch finished = new CountDownLatch(1);
Mockito.doAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
counter[0]++;
LOG.debug(counter[0] + "/ " + expected + ") Mocking call to isFileDeletable");
if (counter[0] > expected) finished.countDown();
return (Boolean) invocation.callRealMethod();
}
}
throw last;
}).when(delegateSpy).isFileDeletable(Mockito.any(Path.class));
cleaners.set(0, delegateSpy);
return finished;
}
/**
* Compact all the store files in a given region.
* Get all the files (non-directory entries) in the file system under the passed directory
* @param dir directory to investigate
* @return all files under the directory
*/
private void compactRegion(HRegion region, byte[] family) throws IOException {
Store store = region.getStores().get(TEST_FAM);
store.compactRecentForTesting(store.getStorefiles().size());
private List<Path> getAllFiles(FileSystem fs, Path dir) throws IOException {
FileStatus[] files = FSUtils.listStatus(fs, dir, null);
if (files == null) return null;
List<Path> allFiles = new ArrayList<Path>();
for (FileStatus file : files) {
if (file.isDir()) {
List<Path> subFiles = getAllFiles(fs, file.getPath());
if (subFiles != null) allFiles.addAll(subFiles);
continue;
}
allFiles.add(file.getPath());
}
return allFiles;
}
private void loadFlushAndCompact(HRegion region, byte[] family) throws IOException {
// create two hfiles in the region
createHFileInRegion(region, family);
createHFileInRegion(region, family);
Store s = region.getStore(family);
int count = s.getStorefilesCount();
assertTrue("Don't have the expected store files, wanted >= 2 store files, but was:" + count,
count >= 2);
// compact the two files into one file to get files in the archive
LOG.debug("Compacting stores");
region.compactStores(true);
}
/**
* Create a new hfile in the passed region
* @param region region to operate on
* @param columnFamily family for which to add data
* @throws IOException
*/
private void createHFileInRegion(HRegion region, byte[] columnFamily) throws IOException {
// put one row in the region
Put p = new Put(Bytes.toBytes("row"));
p.add(columnFamily, Bytes.toBytes("Qual"), Bytes.toBytes("v1"));
region.put(p);
// flush the region to make a store file
region.flushcache();
}
/**
* @param cleaner
*/
private void runCleaner(HFileCleaner cleaner, CountDownLatch finished, Stoppable stop)
throws InterruptedException {
// run the cleaner
cleaner.start();
// wait for the cleaner to check all the files
finished.await();
// stop the cleaner
stop.stop("");
}
}

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.BaseRowProcessor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -78,6 +79,7 @@ public class TestWALObserver {
private FileSystem fs;
private Path dir;
private Path hbaseRootDir;
private String logName;
private Path oldLogDir;
private Path logDir;
@ -112,6 +114,7 @@ public class TestWALObserver {
this.dir = new Path(this.hbaseRootDir, TestWALObserver.class.getName());
this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
this.logDir = new Path(this.hbaseRootDir, HConstants.HREGION_LOGDIR_NAME);
this.logName = HConstants.HREGION_LOGDIR_NAME;
if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
@ -138,7 +141,8 @@ public class TestWALObserver {
deleteDir(basedir);
fs.mkdirs(new Path(basedir, hri.getEncodedName()));
HLog log = new HLog(this.fs, this.dir, this.oldLogDir, this.conf);
HLog log = HLogFactory.createHLog(this.fs, hbaseRootDir,
TestWALObserver.class.getName(), this.conf);
SampleRegionWALObserver cp = getCoprocessor(log);
// TEST_FAMILY[0] shall be removed from WALEdit.
@ -285,7 +289,8 @@ public class TestWALObserver {
*/
@Test
public void testWALObserverLoaded() throws Exception {
HLog log = new HLog(fs, dir, oldLogDir, conf);
HLog log = HLogFactory.createHLog(fs, hbaseRootDir,
TestWALObserver.class.getName(), conf);
assertNotNull(getCoprocessor(log));
}
@ -357,8 +362,7 @@ public class TestWALObserver {
return splits.get(0);
}
private HLog createWAL(final Configuration c) throws IOException {
HLog wal = new HLog(FileSystem.get(c), logDir, oldLogDir, c);
return wal;
return HLogFactory.createHLog(FileSystem.get(c), hbaseRootDir, logName, c);
}
private void addWALEdits (final byte [] tableName, final HRegionInfo hri,
final byte [] rowName, final byte [] family,

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -397,7 +398,7 @@ public class TestBlockReorder {
// Check that it will be possible to extract a ServerName from our construction
Assert.assertNotNull("log= " + pseudoLogFile,
HLog.getServerNameFromHLogDirectoryName(dfs.getConf(), pseudoLogFile));
HLogUtil.getServerNameFromHLogDirectoryName(dfs.getConf(), pseudoLogFile));
// And check we're doing the right reorder.
lrb.reorderBlocks(conf, l, pseudoLogFile);

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.mapreduce.HLogInputFormat.HLogRecordReader;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.InputSplit;
@ -64,7 +65,7 @@ public class TestHLogRecordReader {
private static final byte [] value = Bytes.toBytes("value");
private static HTableDescriptor htd;
private static Path logDir;
private static Path oldLogDir;
private static String logName;
private static String getName() {
return "TestHLogRecordReader";
@ -90,8 +91,10 @@ public class TestHLogRecordReader {
fs = TEST_UTIL.getDFSCluster().getFileSystem();
hbaseDir = TEST_UTIL.createRootDir();
logDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
logName = HConstants.HREGION_LOGDIR_NAME;
logDir = new Path(hbaseDir, logName);
htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(family));
}
@ -107,7 +110,8 @@ public class TestHLogRecordReader {
*/
@Test
public void testPartialRead() throws Exception {
HLog log = new HLog(fs, logDir, oldLogDir, conf);
HLog log = HLogFactory.createHLog(fs, hbaseDir,
logName, conf);
long ts = System.currentTimeMillis();
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
@ -163,7 +167,7 @@ public class TestHLogRecordReader {
*/
@Test
public void testHLogRecordReader() throws Exception {
HLog log = new HLog(fs, logDir, oldLogDir, conf);
HLog log = HLogFactory.createHLog(fs, hbaseDir, logName, conf);
byte [] value = Bytes.toBytes("value");
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),

View File

@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -132,7 +134,7 @@ public class TestDistributedLogSplitting {
regions = ProtobufUtil.getOnlineRegions(hrs);
if (regions.size() != 0) break;
}
final Path logDir = new Path(rootdir, HLog.getHLogDirectoryName(hrs
final Path logDir = new Path(rootdir, HLogUtil.getHLogDirectoryName(hrs
.getServerName().toString()));
LOG.info("#regions = " + regions.size());
@ -153,7 +155,7 @@ public class TestDistributedLogSplitting {
Path tdir = HTableDescriptor.getTableDir(rootdir, table);
Path editsdir =
HLog.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
LOG.debug("checking edits dir " + editsdir);
FileStatus[] files = fs.listStatus(editsdir);
assertEquals(1, files.length);
@ -185,7 +187,7 @@ public class TestDistributedLogSplitting {
HRegionServer hrs = rsts.get(0).getRegionServer();
Path rootdir = FSUtils.getRootDir(conf);
final Path logDir = new Path(rootdir,
HLog.getHLogDirectoryName(hrs.getServerName().toString()));
HLogUtil.getHLogDirectoryName(hrs.getServerName().toString()));
installTable(new ZooKeeperWatcher(conf, "table-creation", null),
"table", "family", 40);
@ -433,7 +435,7 @@ public class TestDistributedLogSplitting {
private int countHLog(Path log, FileSystem fs, Configuration conf)
throws IOException {
int count = 0;
HLog.Reader in = HLog.getReader(fs, log, conf);
HLog.Reader in = HLogFactory.createReader(fs, log, conf);
while (in.next() != null) {
count++;
}

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.TestHFileWriterV2;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
@ -157,11 +158,13 @@ public class TestCacheOnWriteInSchema {
// Create a store based on the schema
Path basedir = new Path(DIR);
Path logdir = new Path(DIR+"/logs");
Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
String logName = "logs";
Path logdir = new Path(DIR, logName);
fs.delete(logdir, true);
HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false);
HLog hlog = new HLog(fs, logdir, oldLogDir, conf);
HLog hlog = HLogFactory.createHLog(fs, basedir, logName, conf);
HRegion region = new HRegion(basedir, hlog, fs, conf, info, htd, null);
store = new HStore(basedir, region, hcd, fs, conf);
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.collect.Lists;
@ -73,7 +74,8 @@ public class TestCompactSelection extends TestCase {
//Setting up a Store
Path basedir = new Path(DIR);
Path logdir = new Path(DIR+"/logs");
String logName = "logs";
Path logdir = new Path(DIR, logName);
Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes("family"));
FileSystem fs = FileSystem.get(conf);
@ -84,7 +86,8 @@ public class TestCompactSelection extends TestCase {
htd.addFamily(hcd);
HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false);
HLog hlog = new HLog(fs, logdir, oldLogDir, conf);
HLog hlog = HLogFactory.createHLog(fs, basedir,
logName, conf);
HRegion region = HRegion.createHRegion(info, basedir, conf, htd);
HRegion.closeHRegion(region);
Path tableDir = new Path(basedir, Bytes.toString(htd.getName()));

View File

@ -80,7 +80,10 @@ import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.HLogMetrics;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -214,7 +217,7 @@ public class TestHRegion extends HBaseTestCase {
FileSystem fs = region.getFilesystem();
byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
Path recoveredEditsDir = HLog.getRegionDirRecoveredEditsDir(regiondir);
Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
long maxSeqId = 1050;
long minSeqId = 1000;
@ -222,7 +225,8 @@ public class TestHRegion extends HBaseTestCase {
for (long i = minSeqId; i <= maxSeqId; i += 10) {
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
fs.create(recoveredEdits);
HLog.Writer writer = HLog.createWriter(fs, recoveredEdits, conf);
HLog.Writer writer = HLogFactory.createWriter(fs,
recoveredEdits, conf);
long time = System.nanoTime();
WALEdit edit = new WALEdit();
@ -265,7 +269,7 @@ public class TestHRegion extends HBaseTestCase {
FileSystem fs = region.getFilesystem();
byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
Path recoveredEditsDir = HLog.getRegionDirRecoveredEditsDir(regiondir);
Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
long maxSeqId = 1050;
long minSeqId = 1000;
@ -273,7 +277,8 @@ public class TestHRegion extends HBaseTestCase {
for (long i = minSeqId; i <= maxSeqId; i += 10) {
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
fs.create(recoveredEdits);
HLog.Writer writer = HLog.createWriter(fs, recoveredEdits, conf);
HLog.Writer writer = HLogFactory.createWriter(fs,
recoveredEdits, conf);
long time = System.nanoTime();
WALEdit edit = new WALEdit();
@ -320,7 +325,7 @@ public class TestHRegion extends HBaseTestCase {
Path regiondir = region.getRegionDir();
FileSystem fs = region.getFilesystem();
Path recoveredEditsDir = HLog.getRegionDirRecoveredEditsDir(regiondir);
Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
for (int i = 1000; i < 1050; i += 10) {
Path recoveredEdits = new Path(
recoveredEditsDir, String.format("%019d", i));
@ -594,8 +599,8 @@ public class TestHRegion extends HBaseTestCase {
byte[] val = Bytes.toBytes("val");
this.region = initHRegion(b, getName(), cf);
try {
HLog.getSyncTime(); // clear counter from prior tests
assertEquals(0, HLog.getSyncTime().count);
HLogMetrics.getSyncTime(); // clear counter from prior tests
assertEquals(0, HLogMetrics.getSyncTime().count);
LOG.info("First a batch put with all valid puts");
final Put[] puts = new Put[10];
@ -610,7 +615,7 @@ public class TestHRegion extends HBaseTestCase {
assertEquals(OperationStatusCode.SUCCESS, codes[i]
.getOperationStatusCode());
}
assertEquals(1, HLog.getSyncTime().count);
assertEquals(1, HLogMetrics.getSyncTime().count);
LOG.info("Next a batch put with one invalid family");
puts[5].add(Bytes.toBytes("BAD_CF"), qual, val);
@ -620,7 +625,7 @@ public class TestHRegion extends HBaseTestCase {
assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY :
OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
}
assertEquals(1, HLog.getSyncTime().count);
assertEquals(1, HLogMetrics.getSyncTime().count);
LOG.info("Next a batch put that has to break into two batches to avoid a lock");
Integer lockedRow = region.obtainRowLock(Bytes.toBytes("row_2"));
@ -641,7 +646,7 @@ public class TestHRegion extends HBaseTestCase {
LOG.info("...waiting for put thread to sync first time");
long startWait = System.currentTimeMillis();
while (HLog.getSyncTime().count == 0) {
while (HLogMetrics.getSyncTime().count == 0) {
Thread.sleep(100);
if (System.currentTimeMillis() - startWait > 10000) {
fail("Timed out waiting for thread to sync first minibatch");
@ -652,7 +657,7 @@ public class TestHRegion extends HBaseTestCase {
LOG.info("...joining on thread");
ctx.stop();
LOG.info("...checking that next batch was synced");
assertEquals(1, HLog.getSyncTime().count);
assertEquals(1, HLogMetrics.getSyncTime().count);
codes = retFromThread.get();
for (int i = 0; i < 10; i++) {
assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY :
@ -676,7 +681,7 @@ public class TestHRegion extends HBaseTestCase {
OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
}
// Make sure we didn't do an extra batch
assertEquals(1, HLog.getSyncTime().count);
assertEquals(1, HLogMetrics.getSyncTime().count);
// Make sure we still hold lock
assertTrue(region.isRowLocked(lockedRow));
@ -702,8 +707,8 @@ public class TestHRegion extends HBaseTestCase {
this.region = initHRegion(b, getName(), conf, cf);
try{
HLog.getSyncTime(); // clear counter from prior tests
assertEquals(0, HLog.getSyncTime().count);
HLogMetrics.getSyncTime(); // clear counter from prior tests
assertEquals(0, HLogMetrics.getSyncTime().count);
final Put[] puts = new Put[10];
for (int i = 0; i < 10; i++) {
@ -717,7 +722,7 @@ public class TestHRegion extends HBaseTestCase {
assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i]
.getOperationStatusCode());
}
assertEquals(0, HLog.getSyncTime().count);
assertEquals(0, HLogMetrics.getSyncTime().count);
} finally {

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.zookeeper.KeeperException;
@ -73,9 +74,9 @@ public class TestSplitTransaction {
this.fs = FileSystem.get(TEST_UTIL.getConfiguration());
TEST_UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, CustomObserver.class.getName());
this.fs.delete(this.testdir, true);
this.wal = new HLog(fs, new Path(this.testdir, "logs"),
new Path(this.testdir, "archive"),
this.wal = HLogFactory.createHLog(fs, this.testdir, "logs",
TEST_UTIL.getConfiguration());
this.parent = createRegion(this.testdir, this.wal);
RegionCoprocessorHost host = new RegionCoprocessorHost(this.parent, null, TEST_UTIL.getConfiguration());
this.parent.setCoprocessorHost(host);

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -134,8 +135,9 @@ public class TestStore extends TestCase {
HColumnDescriptor hcd) throws IOException {
//Setting up a Store
Path basedir = new Path(DIR+methodName);
Path logdir = new Path(DIR+methodName+"/logs");
Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
String logName = "logs";
Path logdir = new Path(basedir, logName);
FileSystem fs = FileSystem.get(conf);
fs.delete(logdir, true);
@ -143,7 +145,7 @@ public class TestStore extends TestCase {
HTableDescriptor htd = new HTableDescriptor(table);
htd.addFamily(hcd);
HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false);
HLog hlog = new HLog(fs, logdir, oldLogDir, conf);
HLog hlog = HLogFactory.createHLog(fs, basedir, logName, conf);
HRegion region = new HRegion(basedir, hlog, fs, conf, info, htd, null);
store = new HStore(basedir, region, hcd, fs, conf);

View File

@ -24,6 +24,7 @@ import java.util.LinkedList;
import java.util.Queue;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
public class FaultySequenceFileLogReader extends SequenceFileLogReader {
@ -45,7 +46,7 @@ public class FaultySequenceFileLogReader extends SequenceFileLogReader {
if (nextQueue.isEmpty()) { // Read the whole thing at once and fake reading
while (b == true) {
HLogKey key = HLog.newKey(conf);
HLogKey key = HLogUtil.newKey(conf);
WALEdit val = new WALEdit();
HLog.Entry e = new HLog.Entry(key, val);
b = this.reader.next(e.getKey(), e.getEdit());

View File

@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
/**
@ -178,8 +180,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
// Initialize Table Descriptor
HTableDescriptor htd = createHTableDescriptor(numFamilies);
final long whenToRoll = roll;
HLog hlog = new HLog(fs, new Path(rootRegionDir, "wals"),
new Path(rootRegionDir, "old.wals"), getConf()) {
HLog hlog = new FSHLog(fs, rootRegionDir, "wals", getConf()) {
int appends = 0;
protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit,
HTableDescriptor htd)
@ -204,7 +205,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
region = null;
}
if (verify) {
Path dir = hlog.getDir();
Path dir = ((FSHLog) hlog).getDir();
long editCount = 0;
for (FileStatus fss: fs.listStatus(dir)) {
editCount += verify(fss.getPath(), verbose);
@ -244,7 +245,8 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
* @throws IOException
*/
private long verify(final Path wal, final boolean verbose) throws IOException {
HLog.Reader reader = HLog.getReader(wal.getFileSystem(getConf()), wal, getConf());
HLog.Reader reader = HLogFactory.createReader(wal.getFileSystem(getConf()),
wal, getConf());
long previousSeqid = -1;
long count = 0;
try {

View File

@ -36,10 +36,10 @@ public class HLogUtilsForTests {
* @return
*/
public static int getNumLogFiles(HLog log) {
return log.getNumLogFiles();
return ((FSHLog) log).getNumLogFiles();
}
public static int getNumEntries(HLog log) {
return log.getNumEntries();
return ((FSHLog) log).getNumEntries();
}
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.Coprocessor;
@ -158,7 +159,8 @@ public class TestHLog {
final byte [] tableName = Bytes.toBytes(getName());
final byte [] rowName = tableName;
Path logdir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
HLog log = new HLog(fs, logdir, oldLogDir, conf);
HLog log = HLogFactory.createHLog(fs, hbaseDir,
HConstants.HREGION_LOGDIR_NAME, conf);
final int howmany = 3;
HRegionInfo[] infos = new HRegionInfo[3];
Path tabledir = new Path(hbaseDir, getName());
@ -235,8 +237,9 @@ public class TestHLog {
assertEquals(bytes.length, read);
out.close();
in.close();
Path subdir = new Path(dir, "hlogdir");
HLog wal = new HLog(fs, subdir, oldLogDir, conf);
HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir", conf);
final int total = 20;
HLog.Reader reader = null;
@ -255,8 +258,8 @@ public class TestHLog {
// gives you EOFE.
wal.sync();
// Open a Reader.
Path walPath = wal.computeFilename();
reader = HLog.getReader(fs, walPath, conf);
Path walPath = ((FSHLog) wal).computeFilename();
reader = HLogFactory.createReader(fs, walPath, conf);
int count = 0;
HLog.Entry entry = new HLog.Entry();
while ((entry = reader.next(entry)) != null) count++;
@ -269,14 +272,14 @@ public class TestHLog {
kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes));
wal.append(info, bytes, kvs, System.currentTimeMillis(), htd);
}
reader = HLog.getReader(fs, walPath, conf);
reader = HLogFactory.createReader(fs, walPath, conf);
count = 0;
while((entry = reader.next(entry)) != null) count++;
assertTrue(count >= total);
reader.close();
// If I sync, should see double the edits.
wal.sync();
reader = HLog.getReader(fs, walPath, conf);
reader = HLogFactory.createReader(fs, walPath, conf);
count = 0;
while((entry = reader.next(entry)) != null) count++;
assertEquals(total * 2, count);
@ -290,14 +293,14 @@ public class TestHLog {
}
// Now I should have written out lots of blocks. Sync then read.
wal.sync();
reader = HLog.getReader(fs, walPath, conf);
reader = HLogFactory.createReader(fs, walPath, conf);
count = 0;
while((entry = reader.next(entry)) != null) count++;
assertEquals(total * 3, count);
reader.close();
// Close it and ensure that closed, Reader gets right length also.
wal.close();
reader = HLog.getReader(fs, walPath, conf);
reader = HLogFactory.createReader(fs, walPath, conf);
count = 0;
while((entry = reader.next(entry)) != null) count++;
assertEquals(total * 3, count);
@ -320,11 +323,11 @@ public class TestHLog {
regionsToSeqids.put(l.toString().getBytes(), l);
}
byte [][] regions =
HLog.findMemstoresWithEditsEqualOrOlderThan(1, regionsToSeqids);
HLogUtil.findMemstoresWithEditsEqualOrOlderThan(1, regionsToSeqids);
assertEquals(2, regions.length);
assertTrue(Bytes.equals(regions[0], "0".getBytes()) ||
Bytes.equals(regions[0], "1".getBytes()));
regions = HLog.findMemstoresWithEditsEqualOrOlderThan(3, regionsToSeqids);
regions = HLogUtil.findMemstoresWithEditsEqualOrOlderThan(3, regionsToSeqids);
int count = 4;
assertEquals(count, regions.length);
// Regions returned are not ordered.
@ -341,7 +344,7 @@ public class TestHLog {
assertEquals(howmany, splits.size());
for (int i = 0; i < splits.size(); i++) {
LOG.info("Verifying=" + splits.get(i));
HLog.Reader reader = HLog.getReader(fs, splits.get(i), conf);
HLog.Reader reader = HLogFactory.createReader(fs, splits.get(i), conf);
try {
int count = 0;
String previousRegion = null;
@ -377,9 +380,9 @@ public class TestHLog {
byte [] tableName = Bytes.toBytes(getName());
HRegionInfo regioninfo = new HRegionInfo(tableName,
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
Path subdir = new Path(dir, "hlogdir");
Path archdir = new Path(dir, "hlogdir_archive");
HLog wal = new HLog(fs, subdir, archdir, conf);
HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir",
"hlogdir_archive", conf);
final int total = 20;
HTableDescriptor htd = new HTableDescriptor();
@ -393,7 +396,7 @@ public class TestHLog {
// Now call sync to send the data to HDFS datanodes
wal.sync();
int namenodePort = cluster.getNameNodePort();
final Path walPath = wal.computeFilename();
final Path walPath = ((FSHLog) wal).computeFilename();
// Stop the cluster. (ensure restart since we're sharing MiniDFSCluster)
@ -477,7 +480,7 @@ public class TestHLog {
SequenceFile.Reader reader
= new SequenceFile.Reader(this.fs, walPath, this.conf);
int count = 0;
HLogKey key = HLog.newKey(conf);
HLogKey key = HLogUtil.newKey(conf);
WALEdit val = new WALEdit();
while (reader.next(key, val)) {
count++;
@ -500,7 +503,8 @@ public class TestHLog {
HLog.Reader reader = null;
HLog log = null;
try {
log = new HLog(fs, dir, oldLogDir, conf);
log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf);
// Write columns named 1, 2, 3, etc. and then values of single byte
// 1, 2, 3...
long timestamp = System.currentTimeMillis();
@ -520,10 +524,10 @@ public class TestHLog {
log.completeCacheFlush(info.getEncodedNameAsBytes(), tableName, logSeqId,
info.isMetaRegion());
log.close();
Path filename = log.computeFilename();
Path filename = ((FSHLog) log).computeFilename();
log = null;
// Now open a reader on the log and assert append worked.
reader = HLog.getReader(fs, filename, conf);
reader = HLogFactory.createReader(fs, filename, conf);
// Above we added all columns on a single row so we only read one
// entry in the below... thats why we have '1'.
for (int i = 0; i < 1; i++) {
@ -548,7 +552,7 @@ public class TestHLog {
KeyValue kv = val.getKeyValues().get(0);
assertTrue(Bytes.equals(HLog.METAROW, kv.getRow()));
assertTrue(Bytes.equals(HLog.METAFAMILY, kv.getFamily()));
assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH,
assertEquals(0, Bytes.compareTo(HLogUtil.COMPLETE_CACHE_FLUSH,
val.getKeyValues().get(0).getValue()));
System.out.println(key + " " + val);
}
@ -571,7 +575,7 @@ public class TestHLog {
final byte [] tableName = Bytes.toBytes("tablename");
final byte [] row = Bytes.toBytes("row");
Reader reader = null;
HLog log = new HLog(fs, dir, oldLogDir, conf);
HLog log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf);
try {
// Write columns named 1, 2, 3, etc. and then values of single byte
// 1, 2, 3...
@ -590,10 +594,10 @@ public class TestHLog {
long logSeqId = log.startCacheFlush(hri.getEncodedNameAsBytes());
log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, logSeqId, false);
log.close();
Path filename = log.computeFilename();
Path filename = ((FSHLog) log).computeFilename();
log = null;
// Now open a reader on the log and assert append worked.
reader = HLog.getReader(fs, filename, conf);
reader = HLogFactory.createReader(fs, filename, conf);
HLog.Entry entry = reader.next();
assertEquals(COL_COUNT, entry.getEdit().size());
int idx = 0;
@ -616,7 +620,7 @@ public class TestHLog {
assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
assertTrue(Bytes.equals(HLog.METAROW, val.getRow()));
assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily()));
assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH,
assertEquals(0, Bytes.compareTo(HLogUtil.COMPLETE_CACHE_FLUSH,
val.getValue()));
System.out.println(entry.getKey() + " " + val);
}
@ -639,7 +643,7 @@ public class TestHLog {
final int COL_COUNT = 10;
final byte [] tableName = Bytes.toBytes("tablename");
final byte [] row = Bytes.toBytes("row");
HLog log = new HLog(fs, dir, oldLogDir, conf);
HLog log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf);
try {
DumbWALActionsListener visitor = new DumbWALActionsListener();
log.registerWALActionsListener(visitor);
@ -675,7 +679,8 @@ public class TestHLog {
final byte [] tableName = Bytes.toBytes("testLogCleaning");
final byte [] tableName2 = Bytes.toBytes("testLogCleaning2");
HLog log = new HLog(fs, dir, oldLogDir, conf);
HLog log = HLogFactory.createHLog(fs, hbaseDir,
getName(), conf);
try {
HRegionInfo hri = new HRegionInfo(tableName,
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
@ -686,12 +691,12 @@ public class TestHLog {
// Before HBASE-3198 it used to delete it
addEdits(log, hri, tableName, 1);
log.rollWriter();
assertEquals(1, log.getNumLogFiles());
assertEquals(1, ((FSHLog) log).getNumLogFiles());
// See if there's anything wrong with more than 1 edit
addEdits(log, hri, tableName, 2);
log.rollWriter();
assertEquals(2, log.getNumLogFiles());
assertEquals(2, ((FSHLog) log).getNumLogFiles());
// Now mix edits from 2 regions, still no flushing
addEdits(log, hri, tableName, 1);
@ -699,14 +704,14 @@ public class TestHLog {
addEdits(log, hri, tableName, 1);
addEdits(log, hri2, tableName2, 1);
log.rollWriter();
assertEquals(3, log.getNumLogFiles());
assertEquals(3, ((FSHLog) log).getNumLogFiles());
// Flush the first region, we expect to see the first two files getting
// archived
long seqId = log.startCacheFlush(hri.getEncodedNameAsBytes());
log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, seqId, false);
log.rollWriter();
assertEquals(2, log.getNumLogFiles());
assertEquals(2, ((FSHLog) log).getNumLogFiles());
// Flush the second region, which removes all the remaining output files
// since the oldest was completely flushed and the two others only contain
@ -714,7 +719,7 @@ public class TestHLog {
seqId = log.startCacheFlush(hri2.getEncodedNameAsBytes());
log.completeCacheFlush(hri2.getEncodedNameAsBytes(), tableName2, seqId, false);
log.rollWriter();
assertEquals(0, log.getNumLogFiles());
assertEquals(0, ((FSHLog) log).getNumLogFiles());
} finally {
if (log != null) log.closeAndDelete();
}
@ -724,23 +729,23 @@ public class TestHLog {
@Test
public void testGetServerNameFromHLogDirectoryName() throws IOException {
String hl = conf.get(HConstants.HBASE_DIR) + "/"+
HLog.getHLogDirectoryName(new ServerName("hn", 450, 1398).toString());
HLogUtil.getHLogDirectoryName(new ServerName("hn", 450, 1398).toString());
// Must not throw exception
Assert.assertNull(HLog.getServerNameFromHLogDirectoryName(conf, null));
Assert.assertNull(HLog.getServerNameFromHLogDirectoryName(conf,
Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, null));
Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf,
conf.get(HConstants.HBASE_DIR) + "/"));
Assert.assertNull( HLog.getServerNameFromHLogDirectoryName(conf, "") );
Assert.assertNull( HLog.getServerNameFromHLogDirectoryName(conf, " ") );
Assert.assertNull( HLog.getServerNameFromHLogDirectoryName(conf, hl) );
Assert.assertNull( HLog.getServerNameFromHLogDirectoryName(conf, hl+"qdf") );
Assert.assertNull( HLog.getServerNameFromHLogDirectoryName(conf, "sfqf"+hl+"qdf") );
Assert.assertNull( HLogUtil.getServerNameFromHLogDirectoryName(conf, "") );
Assert.assertNull( HLogUtil.getServerNameFromHLogDirectoryName(conf, " ") );
Assert.assertNull( HLogUtil.getServerNameFromHLogDirectoryName(conf, hl) );
Assert.assertNull( HLogUtil.getServerNameFromHLogDirectoryName(conf, hl+"qdf") );
Assert.assertNull( HLogUtil.getServerNameFromHLogDirectoryName(conf, "sfqf"+hl+"qdf") );
Assert.assertNotNull( HLog.getServerNameFromHLogDirectoryName(conf, conf.get(
Assert.assertNotNull( HLogUtil.getServerNameFromHLogDirectoryName(conf, conf.get(
HConstants.HBASE_DIR) +
"/.logs/localhost,32984,1343316388997/localhost%2C32984%2C1343316388997.1343316390417"
));
Assert.assertNotNull( HLog.getServerNameFromHLogDirectoryName(conf, hl+"/qdf") );
Assert.assertNotNull( HLogUtil.getServerNameFromHLogDirectoryName(conf, hl+"/qdf") );
}
/**
@ -749,7 +754,8 @@ public class TestHLog {
@Test
public void testWALCoprocessorLoaded() throws Exception {
// test to see whether the coprocessor is loaded or not.
HLog log = new HLog(fs, dir, oldLogDir, conf);
HLog log = HLogFactory.createHLog(fs, hbaseDir,
getName(), conf);
try {
WALCoprocessorHost host = log.getCoprocessorHost();
Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());

View File

@ -56,7 +56,7 @@ public class TestHLogMethods {
Path regiondir = util.getDataTestDir("regiondir");
fs.delete(regiondir, true);
fs.mkdirs(regiondir);
Path recoverededits = HLog.getRegionDirRecoveredEditsDir(regiondir);
Path recoverededits = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
String first = HLogSplitter.formatRecoveredEditsFileName(-1);
createFile(fs, recoverededits, first);
createFile(fs, recoverededits, HLogSplitter.formatRecoveredEditsFileName(0));
@ -70,7 +70,10 @@ public class TestHLogMethods {
createFile(fs, recoverededits, last);
createFile(fs, recoverededits,
Long.toString(Long.MAX_VALUE) + "." + System.currentTimeMillis());
NavigableSet<Path> files = HLog.getSplitEditFilesSorted(fs, regiondir);
HLog log = HLogFactory.createHLog(fs, regiondir,
"dummyLogName", util.getConfiguration());
NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
assertEquals(7, files.size());
assertEquals(files.pollFirst().getName(), first);
assertEquals(files.pollLast().getName(), last);

View File

@ -349,7 +349,7 @@ public class TestHLogSplit {
Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
Reader.class);
InstrumentedSequenceFileLogWriter.activateFailure = false;
HLog.resetLogReaderClass();
HLogFactory.resetLogReaderClass();
try {
Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0");
@ -371,7 +371,7 @@ public class TestHLogSplit {
} finally {
conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
Reader.class);
HLog.resetLogReaderClass();
HLogFactory.resetLogReaderClass();
}
}
@ -382,7 +382,7 @@ public class TestHLogSplit {
Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
Reader.class);
InstrumentedSequenceFileLogWriter.activateFailure = false;
HLog.resetLogReaderClass();
HLogFactory.resetLogReaderClass();
try {
conf.setClass("hbase.regionserver.hlog.reader.impl",
@ -396,7 +396,7 @@ public class TestHLogSplit {
} finally {
conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
Reader.class);
HLog.resetLogReaderClass();
HLogFactory.resetLogReaderClass();
}
}
@ -408,7 +408,7 @@ public class TestHLogSplit {
Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
Reader.class);
InstrumentedSequenceFileLogWriter.activateFailure = false;
HLog.resetLogReaderClass();
HLogFactory.resetLogReaderClass();
try {
conf.setClass("hbase.regionserver.hlog.reader.impl",
@ -428,7 +428,7 @@ public class TestHLogSplit {
} finally {
conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
Reader.class);
HLog.resetLogReaderClass();
HLogFactory.resetLogReaderClass();
}
}
@ -455,7 +455,7 @@ public class TestHLogSplit {
Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
int actualCount = 0;
HLog.Reader in = HLog.getReader(fs, splitLog, conf);
HLog.Reader in = HLogFactory.createReader(fs, splitLog, conf);
HLog.Entry entry;
while ((entry = in.next()) != null) ++actualCount;
assertEquals(entryCount-1, actualCount);
@ -840,14 +840,16 @@ public class TestHLogSplit {
long oldFlushInterval = conf.getLong(F_INTERVAL, 1000);
conf.setLong(F_INTERVAL, 1000*1000*100);
HLog log = null;
Path thisTestsDir = new Path(hbaseDir, "testLogRollAfterSplitStart");
String logName = "testLogRollAfterSplitStart";
Path thisTestsDir = new Path(hbaseDir, logName);
try {
// put some entries in an HLog
byte [] tableName = Bytes.toBytes(this.getClass().getName());
HRegionInfo regioninfo = new HRegionInfo(tableName,
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
log = new HLog(fs, thisTestsDir, oldLogDir, conf);
log = HLogFactory.createHLog(fs, hbaseDir, logName, conf);
final int total = 20;
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
@ -858,7 +860,7 @@ public class TestHLogSplit {
}
// Send the data to HDFS datanodes and close the HDFS writer
log.sync();
log.cleanupCurrentWriter(log.getFilenum());
((FSHLog) log).cleanupCurrentWriter(log.getFilenum());
/* code taken from ProcessServerShutdown.process()
* handles RS shutdowns (as observed by the Master)
@ -980,7 +982,7 @@ public class TestHLogSplit {
}
fs.mkdirs(new Path(tableDir, region));
HLog.Writer writer = HLog.createWriter(fs,
HLog.Writer writer = HLogFactory.createWriter(fs,
julietLog, conf);
appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(),
("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0);
@ -1120,16 +1122,18 @@ public class TestHLogSplit {
regions.add(regionName);
generateHLogs(-1);
final HLog log = HLogFactory.createHLog(fs, regiondir,
regionName, conf);
HLogSplitter logSplitter = new HLogSplitter(
conf, hbaseDir, hlogDir, oldLogDir, fs, null) {
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
throws IOException {
HLog.Writer writer = HLog.createWriter(fs, logfile, conf);
HLog.Writer writer = HLogFactory.createWriter(fs, logfile, conf);
// After creating writer, simulate region's
// replayRecoveredEditsIfAny() which gets SplitEditFiles of this
// region and delete them, excluding files with '.temp' suffix.
NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs,
regiondir);
NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
if (files != null && !files.isEmpty()) {
for (Path file : files) {
if (!this.fs.delete(file, false)) {
@ -1179,7 +1183,8 @@ public class TestHLogSplit {
makeRegionDirs(fs, regions);
fs.mkdirs(hlogDir);
for (int i = 0; i < writers; i++) {
writer[i] = HLog.createWriter(fs, new Path(hlogDir, HLOG_FILE_PREFIX + i), conf);
writer[i] = HLogFactory.createWriter(fs, new Path(hlogDir, HLOG_FILE_PREFIX + i),
conf);
for (int j = 0; j < entries; j++) {
int prefix = 0;
for (String region : regions) {
@ -1198,7 +1203,7 @@ public class TestHLogSplit {
private Path getLogForRegion(Path rootdir, byte[] table, String region)
throws IOException {
Path tdir = HTableDescriptor.getTableDir(rootdir, table);
Path editsdir = HLog.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
Path editsdir = HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
Bytes.toString(region.getBytes())));
FileStatus [] files = this.fs.listStatus(editsdir);
assertEquals(1, files.length);
@ -1283,7 +1288,7 @@ public class TestHLogSplit {
@SuppressWarnings("unused")
private void dumpHLog(Path log, FileSystem fs, Configuration conf) throws IOException {
HLog.Entry entry;
HLog.Reader in = HLog.getReader(fs, log, conf);
HLog.Reader in = HLogFactory.createReader(fs, log, conf);
while ((entry = in.next()) != null) {
System.out.println(entry);
}
@ -1291,7 +1296,7 @@ public class TestHLogSplit {
private int countHLog(Path log, FileSystem fs, Configuration conf) throws IOException {
int count = 0;
HLog.Reader in = HLog.getReader(fs, log, conf);
HLog.Reader in = HLogFactory.createReader(fs, log, conf);
while (in.next() != null) {
count++;
}
@ -1324,8 +1329,8 @@ public class TestHLogSplit {
private void injectEmptyFile(String suffix, boolean closeFile)
throws IOException {
HLog.Writer writer = HLog.createWriter(
fs, new Path(hlogDir, HLOG_FILE_PREFIX + suffix), conf);
HLog.Writer writer = HLogFactory.createWriter(
fs, new Path(hlogDir, HLOG_FILE_PREFIX + suffix), conf);
if (closeFile) writer.close();
}
@ -1352,10 +1357,10 @@ public class TestHLogSplit {
for (int i = 0; i < f1.length; i++) {
// Regions now have a directory named RECOVERED_EDITS_DIR and in here
// are split edit files. In below presume only 1.
Path rd1 = HLog.getRegionDirRecoveredEditsDir(f1[i].getPath());
Path rd1 = HLogUtil.getRegionDirRecoveredEditsDir(f1[i].getPath());
FileStatus[] rd1fs = fs.listStatus(rd1);
assertEquals(1, rd1fs.length);
Path rd2 = HLog.getRegionDirRecoveredEditsDir(f2[i].getPath());
Path rd2 = HLogUtil.getRegionDirRecoveredEditsDir(f2[i].getPath());
FileStatus[] rd2fs = fs.listStatus(rd2);
assertEquals(1, rd2fs.length);
if (!logsAreEqual(rd1fs[0].getPath(), rd2fs[0].getPath())) {
@ -1367,8 +1372,8 @@ public class TestHLogSplit {
private boolean logsAreEqual(Path p1, Path p2) throws IOException {
HLog.Reader in1, in2;
in1 = HLog.getReader(fs, p1, conf);
in2 = HLog.getReader(fs, p2, conf);
in1 = HLogFactory.createReader(fs, p1, conf);
in2 = HLogFactory.createReader(fs, p2, conf);
HLog.Entry entry1;
HLog.Entry entry2;
while ((entry1 = in1.next()) != null) {

View File

@ -138,7 +138,7 @@ public class TestLogRollAbort {
HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
HLog log = server.getWAL();
assertTrue("Need HDFS-826 for this test", log.canGetCurReplicas());
assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas());
// don't run this test without append support (HDFS-200 & HDFS-142)
assertTrue("Need append support for this test",
FSUtils.isAppendSupported(TEST_UTIL.getConfiguration()));
@ -156,13 +156,13 @@ public class TestLogRollAbort {
dfsCluster.restartDataNodes();
LOG.info("Restarted datanodes");
assertTrue("Should have an outstanding WAL edit", log.hasDeferredEntries());
assertTrue("Should have an outstanding WAL edit", ((FSHLog) log).hasDeferredEntries());
try {
log.rollWriter(true);
fail("Log roll should have triggered FailedLogCloseException");
} catch (FailedLogCloseException flce) {
assertTrue("Should have deferred flush log edits outstanding",
log.hasDeferredEntries());
((FSHLog) log).hasDeferredEntries());
}
}

View File

@ -192,7 +192,7 @@ public class TestLogRolling {
public void testLogRolling() throws FailedLogCloseException, IOException {
this.tableName = getName();
startAndWriteData();
LOG.info("after writing there are " + log.getNumLogFiles() + " log files");
LOG.info("after writing there are " + ((FSHLog) log).getNumLogFiles() + " log files");
// flush all regions
@ -205,9 +205,9 @@ public class TestLogRolling {
// Now roll the log
log.rollWriter();
int count = log.getNumLogFiles();
int count = ((FSHLog) log).getNumLogFiles();
LOG.info("after flushing all regions and rolling logs there are " +
log.getNumLogFiles() + " log files");
((FSHLog) log).getNumLogFiles() + " log files");
assertTrue(("actual count: " + count), count <= 2);
}
@ -268,7 +268,7 @@ public class TestLogRolling {
*/
DatanodeInfo[] getPipeline(HLog log) throws IllegalArgumentException,
IllegalAccessException, InvocationTargetException {
OutputStream stm = log.getOutputStream();
OutputStream stm = ((FSHLog) log).getOutputStream();
Method getPipeline = null;
for (Method m : stm.getClass().getDeclaredMethods()) {
if (m.getName().endsWith("getPipeline")) {
@ -315,7 +315,7 @@ public class TestLogRolling {
server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
this.log = server.getWAL();
assertTrue("Need HDFS-826 for this test", log.canGetCurReplicas());
assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas());
// don't run this test without append support (HDFS-200 & HDFS-142)
assertTrue("Need append support for this test", FSUtils
.isAppendSupported(TEST_UTIL.getConfiguration()));
@ -342,12 +342,12 @@ public class TestLogRolling {
writeData(table, 2);
long curTime = System.currentTimeMillis();
long oldFilenum = log.getFilenum();
long oldFilenum = ((FSHLog) log).getFilenum();
assertTrue("Log should have a timestamp older than now",
curTime > oldFilenum && oldFilenum != -1);
assertTrue("The log shouldn't have rolled yet",
oldFilenum == log.getFilenum());
oldFilenum == ((FSHLog) log).getFilenum());
final DatanodeInfo[] pipeline = getPipeline(log);
assertTrue(pipeline.length == fs.getDefaultReplication());
@ -357,7 +357,7 @@ public class TestLogRolling {
// this write should succeed, but trigger a log roll
writeData(table, 2);
long newFilenum = log.getFilenum();
long newFilenum = ((FSHLog) log).getFilenum();
assertTrue("Missing datanode should've triggered a log roll",
newFilenum > oldFilenum && newFilenum > curTime);
@ -365,7 +365,7 @@ public class TestLogRolling {
// write some more log data (this should use a new hdfs_out)
writeData(table, 3);
assertTrue("The log should not roll again.",
log.getFilenum() == newFilenum);
((FSHLog) log).getFilenum() == newFilenum);
// kill another datanode in the pipeline, so the replicas will be lower than
// the configured value 2.
assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null);
@ -382,8 +382,8 @@ public class TestLogRolling {
log.rollWriter(true);
batchWriteAndWait(table, 13, true, 10000);
assertTrue("New log file should have the default replication instead of " +
log.getLogReplication(),
log.getLogReplication() == fs.getDefaultReplication());
((FSHLog) log).getLogReplication(),
((FSHLog) log).getLogReplication() == fs.getDefaultReplication());
assertTrue("LowReplication Roller should've been enabled",
log.isLowReplicationRollEnabled());
}
@ -417,7 +417,7 @@ public class TestLogRolling {
this.log = server.getWAL();
final List<Path> paths = new ArrayList<Path>();
final List<Integer> preLogRolledCalled = new ArrayList<Integer>();
paths.add(log.computeFilename());
paths.add(((FSHLog) log).computeFilename());
log.registerWALActionsListener(new WALActionsListener() {
@Override
public void preLogRoll(Path oldFile, Path newFile) {
@ -444,7 +444,7 @@ public class TestLogRolling {
WALEdit logEdit) {}
});
assertTrue("Need HDFS-826 for this test", log.canGetCurReplicas());
assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas());
// don't run this test without append support (HDFS-200 & HDFS-142)
assertTrue("Need append support for this test", FSUtils
.isAppendSupported(TEST_UTIL.getConfiguration()));
@ -498,7 +498,8 @@ public class TestLogRolling {
LOG.debug("Reading HLog "+FSUtils.getPath(p));
HLog.Reader reader = null;
try {
reader = HLog.getReader(fs, p, TEST_UTIL.getConfiguration());
reader = HLogFactory.createReader(fs, p,
TEST_UTIL.getConfiguration());
HLog.Entry entry;
while ((entry = reader.next()) != null) {
LOG.debug("#"+entry.getKey().getLogSeqNum()+": "+entry.getEdit().getKeyValues());

View File

@ -53,8 +53,9 @@ public class TestLogRollingNoCluster {
public void testContendedLogRolling() throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
Path dir = TEST_UTIL.getDataTestDir();
HLog wal = new HLog(fs, new Path(dir, "logs"), new Path(dir, "oldlogs"),
HLog wal = HLogFactory.createHLog(fs, dir, "logs",
TEST_UTIL.getConfiguration());
Appender [] appenders = null;
final int count = THREAD_COUNT;
@ -113,7 +114,7 @@ public class TestLogRollingNoCluster {
for (int i = 0; i < this.count; i++) {
long now = System.currentTimeMillis();
// Roll every ten edits if the log has anything in it.
if (i % 10 == 0 && this.wal.getNumEntries() > 0) {
if (i % 10 == 0 && ((FSHLog) this.wal).getNumEntries() > 0) {
this.wal.rollWriter();
}
WALEdit edit = new WALEdit();
@ -136,4 +137,7 @@ public class TestLogRollingNoCluster {
}
}
//@org.junit.Rule
//public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
// new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
}

View File

@ -50,6 +50,7 @@ public class TestWALActionsListener {
private static FileSystem fs;
private static Path oldLogDir;
private static Path logDir;
private static String logName;
private static Configuration conf;
@BeforeClass
@ -59,8 +60,9 @@ public class TestWALActionsListener {
fs = FileSystem.get(conf);
oldLogDir = new Path(TEST_UTIL.getDataTestDir(),
HConstants.HREGION_OLDLOGDIR_NAME);
logName = HConstants.HREGION_LOGDIR_NAME;
logDir = new Path(TEST_UTIL.getDataTestDir(),
HConstants.HREGION_LOGDIR_NAME);
logName);
}
@Before
@ -85,7 +87,8 @@ public class TestWALActionsListener {
List<WALActionsListener> list = new ArrayList<WALActionsListener>();
list.add(observer);
DummyWALActionsListener laterobserver = new DummyWALActionsListener();
HLog hlog = new HLog(fs, logDir, oldLogDir, conf, list, null);
HLog hlog = HLogFactory.createHLog(fs, TEST_UTIL.getDataTestDir(), logName,
conf, list, null);
HRegionInfo hri = new HRegionInfo(SOME_BYTES,
SOME_BYTES, SOME_BYTES, false);

View File

@ -71,6 +71,7 @@ public class TestWALReplay {
static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
private Path hbaseRootDir = null;
private String logName;
private Path oldLogDir;
private Path logDir;
private FileSystem fs;
@ -100,7 +101,8 @@ public class TestWALReplay {
this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
this.hbaseRootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
this.logDir = new Path(this.hbaseRootDir, HConstants.HREGION_LOGDIR_NAME);
this.logName = HConstants.HREGION_LOGDIR_NAME;
this.logDir = new Path(this.hbaseRootDir, logName);
if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
}
@ -408,7 +410,7 @@ public class TestWALReplay {
wal2.sync();
// Set down maximum recovery so we dfsclient doesn't linger retrying something
// long gone.
HBaseTestingUtility.setMaxRecoveryErrorCount(wal2.getOutputStream(), 1);
HBaseTestingUtility.setMaxRecoveryErrorCount(((FSHLog) wal2).getOutputStream(), 1);
final Configuration newConf = HBaseConfiguration.create(this.conf);
User user = HBaseTestingUtility.getDifferentUser(newConf,
tableNameStr);
@ -576,7 +578,7 @@ public class TestWALReplay {
wal.sync();
// Set down maximum recovery so we dfsclient doesn't linger retrying something
// long gone.
HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
HBaseTestingUtility.setMaxRecoveryErrorCount(((FSHLog) wal).getOutputStream(), 1);
// Make a new conf and a new fs for the splitter to run on so we can take
// over old wal.
final Configuration newConf = HBaseConfiguration.create(this.conf);
@ -676,11 +678,11 @@ public class TestWALReplay {
lastestSeqNumber, editCount);
}
static class MockHLog extends HLog {
static class MockHLog extends FSHLog {
boolean doCompleteCacheFlush = false;
public MockHLog(FileSystem fs, Path dir, Path oldLogDir, Configuration conf) throws IOException {
super(fs, dir, oldLogDir, conf);
public MockHLog(FileSystem fs, Path rootDir, String logName, Configuration conf) throws IOException {
super(fs, rootDir, logName, conf);
}
@Override
@ -701,10 +703,10 @@ public class TestWALReplay {
}
private MockHLog createMockWAL(Configuration conf) throws IOException {
MockHLog wal = new MockHLog(FileSystem.get(conf), logDir, oldLogDir, conf);
MockHLog wal = new MockHLog(FileSystem.get(conf), hbaseRootDir, logName, conf);
// Set down maximum recovery so we dfsclient doesn't linger retrying something
// long gone.
HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
HBaseTestingUtility.setMaxRecoveryErrorCount(((FSHLog) wal).getOutputStream(), 1);
return wal;
}
@ -784,10 +786,11 @@ public class TestWALReplay {
* @throws IOException
*/
private HLog createWAL(final Configuration c) throws IOException {
HLog wal = new HLog(FileSystem.get(c), logDir, oldLogDir, c);
HLog wal = HLogFactory.createHLog(FileSystem.get(c),
hbaseRootDir, logName, c);
// Set down maximum recovery so we dfsclient doesn't linger retrying something
// long gone.
HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
HBaseTestingUtility.setMaxRecoveryErrorCount(((FSHLog) wal).getOutputStream(), 1);
return wal;
}

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
@ -77,7 +79,8 @@ public class TestReplicationSource {
Path logPath = new Path(logDir, "log");
if (!FS.exists(logDir)) FS.mkdirs(logDir);
if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir);
HLog.Writer writer = HLog.createWriter(FS, logPath, conf);
HLog.Writer writer = HLogFactory.createWriter(FS,
logPath, conf);
for(int i = 0; i < 3; i++) {
byte[] b = Bytes.toBytes(Integer.toString(i));
KeyValue kv = new KeyValue(b,b,b);
@ -89,7 +92,8 @@ public class TestReplicationSource {
}
writer.close();
HLog.Reader reader = HLog.getReader(FS, logPath, conf);
HLog.Reader reader = HLogFactory.createReader(FS,
logPath, conf);
HLog.Entry entry = reader.next();
assertNotNull(entry);

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -88,6 +89,8 @@ public class TestReplicationSourceManager {
private static FileSystem fs;
private static String logName;
private static Path oldLogDir;
private static Path logDir;
@ -122,6 +125,7 @@ public class TestReplicationSourceManager {
HConstants.HREGION_OLDLOGDIR_NAME);
logDir = new Path(utility.getDataTestDir(),
HConstants.HREGION_LOGDIR_NAME);
logName = HConstants.HREGION_LOGDIR_NAME;
manager.addSource(slaveId);
@ -164,8 +168,8 @@ public class TestReplicationSourceManager {
List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
listeners.add(replication);
HLog hlog = new HLog(fs, logDir, oldLogDir, conf, listeners,
URLEncoder.encode("regionserver:60020", "UTF8"));
HLog hlog = HLogFactory.createHLog(fs, utility.getDataTestDir(), logName,
conf, listeners, URLEncoder.encode("regionserver:60020", "UTF8"));
manager.init();
HTableDescriptor htd = new HTableDescriptor();

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -262,11 +263,14 @@ public class TestMergeTool extends HBaseTestCase {
}
// Create a log that we can reuse when we need to open regions
Path logPath = new Path("/tmp", HConstants.HREGION_LOGDIR_NAME + "_" +
System.currentTimeMillis());
LOG.info("Creating log " + logPath.toString());
Path oldLogDir = new Path("/tmp", HConstants.HREGION_OLDLOGDIR_NAME);
HLog log = new HLog(this.fs, logPath, oldLogDir, this.conf);
Path logPath = new Path("/tmp");
String logName = HConstants.HREGION_LOGDIR_NAME + "_"
+ System.currentTimeMillis();
LOG.info("Creating log " + logPath.toString() + "/" + logName);
HLog log = HLogFactory.createHLog(this.fs, logPath,
logName, this.conf);
try {
// Merge Region 0 and Region 1
HRegion merged = mergeAndVerify("merging regions 0 and 1",