HBASE-2070 Collect HLogs and delete them after a period of time
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@911233 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7571814071
commit
4c7414ec20
|
@ -11,6 +11,7 @@ Release 0.21.0 - Unreleased
|
|||
HBASE-2212 Refactor out lucene dependencies from HBase
|
||||
(Kay Kay via Stack)
|
||||
HBASE-2219 stop using code mapping for method names in the RPC
|
||||
HBASE-1728 Column family scoping and cluster identification
|
||||
|
||||
BUG FIXES
|
||||
HBASE-1791 Timeout in IndexRecordWriter (Bradford Stephens via Andrew
|
||||
|
@ -393,6 +394,7 @@ Release 0.21.0 - Unreleased
|
|||
HBASE-1433 Update hbase build to match core, use ivy, publish jars to maven
|
||||
repo, etc. (Kay Kay via Stack)
|
||||
HBASE-2129 Simple Master/Slave replication
|
||||
HBASE-2070 Collect HLogs and delete them after a period of time
|
||||
|
||||
OPTIMIZATIONS
|
||||
HBASE-410 [testing] Speed up the test suite
|
||||
|
|
|
@ -282,6 +282,13 @@
|
|||
takes longer than this interval, assign to a new regionserver.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.master.logcleaner.ttl</name>
|
||||
<value>600000</value>
|
||||
<description>Maximum time a log can stay in the .oldlogdir directory,
|
||||
after which it will be cleaned by a master thread.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.regions.percheckin</name>
|
||||
<value>10</value>
|
||||
|
|
|
@ -72,9 +72,10 @@ public class ReplicationRegionServer extends HRegionServer
|
|||
}
|
||||
|
||||
@Override
|
||||
protected HLog instantiateHLog(Path logdir) throws IOException {
|
||||
protected HLog instantiateHLog(Path logdir, Path oldLogDir)
|
||||
throws IOException {
|
||||
HLog newlog = new ReplicationHLog(super.getFileSystem(),
|
||||
logdir, conf, super.getLogRoller(),
|
||||
logdir, oldLogDir, conf, super.getLogRoller(),
|
||||
this.replicationSource);
|
||||
return newlog;
|
||||
}
|
||||
|
|
|
@ -56,11 +56,11 @@ public class ReplicationHLog extends HLog {
|
|||
* @throws IOException
|
||||
*/
|
||||
public ReplicationHLog(final FileSystem fs, final Path dir,
|
||||
final Configuration conf,
|
||||
final Path oldLogDir, final Configuration conf,
|
||||
final LogRollListener listener,
|
||||
ReplicationSource replicationSource)
|
||||
throws IOException {
|
||||
super(fs, dir, conf, listener);
|
||||
super(fs, dir, oldLogDir, conf, listener);
|
||||
this.replicationSource = replicationSource;
|
||||
this.isReplicator = this.replicationSource != null;
|
||||
}
|
||||
|
|
|
@ -41,9 +41,9 @@ import org.apache.hadoop.hbase.regionserver.wal.LogRollListener;
|
|||
*/
|
||||
class THLog extends HLog {
|
||||
|
||||
public THLog(FileSystem fs, Path dir, Configuration conf,
|
||||
public THLog(FileSystem fs, Path dir, Path oldLogDir, Configuration conf,
|
||||
LogRollListener listener) throws IOException {
|
||||
super(fs, dir, conf, listener);
|
||||
super(fs, dir, oldLogDir, conf, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -102,10 +102,11 @@ public class TransactionalRegionServer extends HRegionServer implements
|
|||
}
|
||||
|
||||
@Override
|
||||
protected HLog instantiateHLog(Path logdir) throws IOException {
|
||||
protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
|
||||
conf.set("hbase.regionserver.hlog.keyclass",
|
||||
THLogKey.class.getCanonicalName());
|
||||
HLog newlog = new THLog(super.getFileSystem(), logdir, conf, super.getLogRoller());
|
||||
HLog newlog = new THLog(super.getFileSystem(), logdir, oldLogDir,
|
||||
conf, super.getLogRoller());
|
||||
return newlog;
|
||||
}
|
||||
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|||
public class TestTHLog extends HBaseTestCase implements
|
||||
HConstants {
|
||||
private Path dir;
|
||||
private Path oldLogdir;
|
||||
private MiniDFSCluster cluster;
|
||||
|
||||
final byte[] tableName = Bytes.toBytes("tablename");
|
||||
|
@ -62,6 +63,8 @@ public class TestTHLog extends HBaseTestCase implements
|
|||
THLogKey.class.getCanonicalName());
|
||||
super.setUp();
|
||||
this.dir = new Path("/hbase", getName());
|
||||
this.oldLogdir = new Path("/hbase", getName()+"_old");
|
||||
|
||||
if (fs.exists(dir)) {
|
||||
fs.delete(dir, true);
|
||||
}
|
||||
|
@ -81,7 +84,7 @@ public class TestTHLog extends HBaseTestCase implements
|
|||
*/
|
||||
public void testSingleCommit() throws IOException {
|
||||
|
||||
THLog log = new THLog(fs, dir, this.conf, null);
|
||||
THLog log = new THLog(fs, dir, oldLogdir, this.conf, null);
|
||||
THLogRecoveryManager logRecoveryMangaer = new THLogRecoveryManager(fs,
|
||||
regionInfo, conf);
|
||||
|
||||
|
@ -114,7 +117,7 @@ public class TestTHLog extends HBaseTestCase implements
|
|||
*/
|
||||
public void testSingleAbort() throws IOException {
|
||||
|
||||
THLog log = new THLog(fs, dir, this.conf, null);
|
||||
THLog log = new THLog(fs, dir, oldLogdir, this.conf, null);
|
||||
THLogRecoveryManager logRecoveryMangaer = new THLogRecoveryManager(fs,
|
||||
regionInfo, conf);
|
||||
|
||||
|
@ -143,7 +146,7 @@ public class TestTHLog extends HBaseTestCase implements
|
|||
*/
|
||||
public void testInterlievedCommits() throws IOException {
|
||||
|
||||
THLog log = new THLog(fs, dir, this.conf, null);
|
||||
THLog log = new THLog(fs, dir, oldLogdir, this.conf, null);
|
||||
THLogRecoveryManager logMangaer = new THLogRecoveryManager(fs, regionInfo,
|
||||
conf);
|
||||
|
||||
|
@ -178,7 +181,7 @@ public class TestTHLog extends HBaseTestCase implements
|
|||
*/
|
||||
public void testInterlievedAbortCommit() throws IOException {
|
||||
|
||||
THLog log = new THLog(fs, dir, this.conf, null);
|
||||
THLog log = new THLog(fs, dir, oldLogdir, this.conf, null);
|
||||
THLogRecoveryManager logMangaer = new THLogRecoveryManager(fs, regionInfo,
|
||||
conf);
|
||||
|
||||
|
@ -213,7 +216,7 @@ public class TestTHLog extends HBaseTestCase implements
|
|||
*/
|
||||
public void testInterlievedCommitAbort() throws IOException {
|
||||
|
||||
THLog log = new THLog(fs, dir, this.conf, null);
|
||||
THLog log = new THLog(fs, dir, oldLogdir, this.conf, null);
|
||||
THLogRecoveryManager logMangaer = new THLogRecoveryManager(fs, regionInfo,
|
||||
conf);
|
||||
|
||||
|
|
|
@ -129,6 +129,9 @@ public interface HConstants {
|
|||
* Use '.' as a special character to seperate the log files from table data */
|
||||
static final String HREGION_LOGDIR_NAME = ".logs";
|
||||
|
||||
/** Like the previous, but for old logs that are about to be deleted */
|
||||
static final String HREGION_OLDLOGDIR_NAME = ".oldlogs";
|
||||
|
||||
/** Name of old log file for reconstruction */
|
||||
static final String HREGION_OLDLOGFILE_NAME = "oldlogfile.log";
|
||||
|
||||
|
|
|
@ -115,8 +115,9 @@ class HMerge implements HConstants {
|
|||
);
|
||||
Path logdir = new Path(tabledir, "merge_" + System.currentTimeMillis() +
|
||||
HREGION_LOGDIR_NAME);
|
||||
Path oldLogDir = new Path(tabledir, HREGION_OLDLOGDIR_NAME);
|
||||
this.hlog =
|
||||
new HLog(fs, logdir, conf, null);
|
||||
new HLog(fs, logdir, oldLogDir, conf, null);
|
||||
}
|
||||
|
||||
void process() throws IOException {
|
||||
|
|
|
@ -138,6 +138,8 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
|||
private final FileSystem fs;
|
||||
// Is the fileystem ok?
|
||||
private volatile boolean fsOk = true;
|
||||
// The Path to the old logs dir
|
||||
private final Path oldLogDir;
|
||||
|
||||
// Queues for RegionServerOperation events. Includes server open, shutdown,
|
||||
// and region open and close.
|
||||
|
@ -172,6 +174,12 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
|||
this.fs = FileSystem.get(this.conf);
|
||||
checkRootDir(this.rootdir, this.conf, this.fs);
|
||||
|
||||
// Make sure the region servers can archive their old logs
|
||||
this.oldLogDir = new Path(this.rootdir, HREGION_OLDLOGDIR_NAME);
|
||||
if(!this.fs.exists(this.oldLogDir)) {
|
||||
this.fs.mkdirs(this.oldLogDir);
|
||||
}
|
||||
|
||||
// Get my address and create an rpc server instance. The rpc-server port
|
||||
// can be ephemeral...ensure we have the correct info
|
||||
HServerAddress a = new HServerAddress(getMyAddress(this.conf));
|
||||
|
@ -395,6 +403,14 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
|||
return this.serverManager.getAverageLoad();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the directory where old logs go
|
||||
* @return the dir
|
||||
*/
|
||||
public Path getOldLogDir() {
|
||||
return this.oldLogDir;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add to the passed <code>m</code> servers that are loaded less than
|
||||
* <code>l</code>.
|
||||
|
@ -630,7 +646,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
|||
Path logDir =
|
||||
new Path(this.rootdir, HLog.getHLogDirectoryName(serverName));
|
||||
try {
|
||||
HLog.splitLog(this.rootdir, logDir, this.fs, getConfiguration());
|
||||
HLog.splitLog(this.rootdir, logDir, oldLogDir, this.fs, getConfiguration());
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed splitting " + logDir.toString(), e);
|
||||
} finally {
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
/**
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import org.apache.hadoop.hbase.Chore;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.regex.Pattern;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* This Chore, everytime it runs, will clear the logs in the old logs folder
|
||||
* that are older than hbase.master.logcleaner.ttl and, in order to limit the
|
||||
* number of deletes it sends, will only delete maximum 20 in a single run.
|
||||
*/
|
||||
public class OldLogsCleaner extends Chore {
|
||||
|
||||
static final Log LOG = LogFactory.getLog(OldLogsCleaner.class.getName());
|
||||
|
||||
// Configured time a log can be kept after it was closed
|
||||
private final long ttl;
|
||||
// Max number we can delete on every chore, this is to make sure we don't
|
||||
// issue thousands of delete commands around the same time
|
||||
private final int maxDeletedLogs;
|
||||
private final FileSystem fs;
|
||||
private final Path oldLogDir;
|
||||
// We expect a file looking like ts.hlog.dat.ts
|
||||
private final Pattern pattern = Pattern.compile("\\d*\\.hlog\\.dat\\.\\d*");
|
||||
|
||||
/**
|
||||
*
|
||||
* @param p
|
||||
* @param s
|
||||
* @param conf
|
||||
* @param fs
|
||||
* @param oldLogDir
|
||||
*/
|
||||
public OldLogsCleaner(final int p, final AtomicBoolean s,
|
||||
Configuration conf, FileSystem fs,
|
||||
Path oldLogDir) {
|
||||
super(p, s);
|
||||
this.ttl = conf.getLong("hbase.master.logcleaner.ttl", 600000);
|
||||
this.maxDeletedLogs =
|
||||
conf.getInt("hbase.master.logcleaner.maxdeletedlogs", 20);
|
||||
this.fs = fs;
|
||||
this.oldLogDir = oldLogDir;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void chore() {
|
||||
try {
|
||||
FileStatus[] files = this.fs.listStatus(this.oldLogDir);
|
||||
long currentTime = System.currentTimeMillis();
|
||||
int nbDeletedLog = 0;
|
||||
for (FileStatus file : files) {
|
||||
Path filePath = file.getPath();
|
||||
|
||||
if (pattern.matcher(filePath.getName()).matches()) {
|
||||
String[] parts = filePath.getName().split("\\.");
|
||||
long time = 0;
|
||||
try {
|
||||
time = Long.parseLong(parts[3]);
|
||||
} catch (NumberFormatException e) {
|
||||
// won't happen
|
||||
}
|
||||
long life = currentTime - time;
|
||||
if (life < 0) {
|
||||
LOG.warn("Found a log newer than current time, " +
|
||||
"probably a clock skew");
|
||||
continue;
|
||||
}
|
||||
if (life > ttl) {
|
||||
this.fs.delete(filePath, true);
|
||||
nbDeletedLog++;
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Found a wrongly formated file: "
|
||||
+ file.getPath().getName());
|
||||
this.fs.delete(filePath, true);
|
||||
nbDeletedLog++;
|
||||
}
|
||||
if (nbDeletedLog >= maxDeletedLogs) {
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e = RemoteExceptionHandler.checkIOException(e);
|
||||
LOG.warn("Error while cleaning the logs", e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
||||
/**
|
||||
* Instantiated when a server's lease has expired, meaning it has crashed.
|
||||
|
@ -47,7 +48,7 @@ class ProcessServerShutdown extends RegionServerOperation {
|
|||
private boolean isRootServer;
|
||||
private List<MetaRegion> metaRegions;
|
||||
|
||||
private Path oldLogDir;
|
||||
private Path rsLogDir;
|
||||
private boolean logSplit;
|
||||
private boolean rootRescanned;
|
||||
private HServerAddress deadServerAddress;
|
||||
|
@ -73,7 +74,7 @@ class ProcessServerShutdown extends RegionServerOperation {
|
|||
this.deadServerAddress = serverInfo.getServerAddress();
|
||||
this.logSplit = false;
|
||||
this.rootRescanned = false;
|
||||
this.oldLogDir =
|
||||
this.rsLogDir =
|
||||
new Path(master.getRootDir(), HLog.getHLogDirectoryName(serverInfo));
|
||||
|
||||
// check to see if I am responsible for either ROOT or any of the META tables.
|
||||
|
@ -275,13 +276,13 @@ class ProcessServerShutdown extends RegionServerOperation {
|
|||
master.getRegionManager().numOnlineMetaRegions());
|
||||
if (!logSplit) {
|
||||
// Process the old log file
|
||||
if (this.master.getFileSystem().exists(oldLogDir)) {
|
||||
if (this.master.getFileSystem().exists(rsLogDir)) {
|
||||
if (!master.getRegionManager().splitLogLock.tryLock()) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
HLog.splitLog(master.getRootDir(), oldLogDir,
|
||||
this.master.getFileSystem(),
|
||||
HLog.splitLog(master.getRootDir(), rsLogDir,
|
||||
this.master.getOldLogDir(), this.master.getFileSystem(),
|
||||
this.master.getConfiguration());
|
||||
} finally {
|
||||
master.getRegionManager().splitLogLock.unlock();
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.Get;
|
|||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.Watcher.Event.EventType;
|
||||
|
@ -95,6 +96,8 @@ public class ServerManager implements HConstants {
|
|||
|
||||
private int minimumServerCount;
|
||||
|
||||
private final OldLogsCleaner oldLogCleaner;
|
||||
|
||||
/*
|
||||
* Dumps into log current stats on dead servers and number of servers
|
||||
* TODO: Make this a metric; dump metrics into log.
|
||||
|
@ -143,6 +146,13 @@ public class ServerManager implements HConstants {
|
|||
this.serverMonitorThread = new ServerMonitor(metaRescanInterval,
|
||||
this.master.getShutdownRequested());
|
||||
this.serverMonitorThread.start();
|
||||
this.oldLogCleaner = new OldLogsCleaner(
|
||||
c.getInt("hbase.master.meta.thread.rescanfrequency",60 * 1000),
|
||||
this.master.getShutdownRequested(), c,
|
||||
master.getFileSystem(), master.getOldLogDir());
|
||||
Threads.setDaemonThreadRunning(oldLogCleaner,
|
||||
"ServerManager.oldLogCleaner");
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1860,7 +1860,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
FileSystem fs = FileSystem.get(conf);
|
||||
fs.mkdirs(regionDir);
|
||||
HRegion region = new HRegion(tableDir,
|
||||
new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf, null),
|
||||
new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME),
|
||||
new Path(regionDir, HREGION_OLDLOGDIR_NAME), conf, null),
|
||||
fs, conf, info, null);
|
||||
region.initialize(null, null);
|
||||
return region;
|
||||
|
@ -2533,7 +2534,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
FileSystem fs = FileSystem.get(c);
|
||||
Path logdir = new Path(c.get("hbase.tmp.dir"),
|
||||
"hlog" + tableDir.getName() + System.currentTimeMillis());
|
||||
HLog log = new HLog(fs, logdir, c, null);
|
||||
Path oldLogDir = new Path(c.get("hbase.tmp.dir"), HREGION_OLDLOGDIR_NAME);
|
||||
HLog log = new HLog(fs, logdir, oldLogDir, c, null);
|
||||
try {
|
||||
processTable(fs, tableDir, log, c, majorCompact);
|
||||
} finally {
|
||||
|
|
|
@ -963,7 +963,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
|
||||
private HLog setupHLog() throws RegionServerRunningException,
|
||||
IOException {
|
||||
|
||||
Path oldLogDir = new Path(rootDir, HREGION_OLDLOGDIR_NAME);
|
||||
Path logdir = new Path(rootDir, HLog.getHLogDirectoryName(this.serverInfo));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Log dir " + logdir);
|
||||
|
@ -973,13 +973,13 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
"running at " + this.serverInfo.getServerAddress().toString() +
|
||||
" because logdir " + logdir.toString() + " exists");
|
||||
}
|
||||
HLog newlog = instantiateHLog(logdir);
|
||||
HLog newlog = instantiateHLog(logdir, oldLogDir);
|
||||
return newlog;
|
||||
}
|
||||
|
||||
// instantiate
|
||||
protected HLog instantiateHLog(Path logdir) throws IOException {
|
||||
HLog newlog = new HLog(fs, logdir, conf, hlogRoller);
|
||||
protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
|
||||
HLog newlog = new HLog(fs, logdir, oldLogDir, conf, hlogRoller);
|
||||
return newlog;
|
||||
}
|
||||
|
||||
|
|
|
@ -117,6 +117,7 @@ public class HLog implements HConstants, Syncable {
|
|||
private final long blocksize;
|
||||
private final int flushlogentries;
|
||||
private final AtomicInteger unflushedEntries = new AtomicInteger(0);
|
||||
private final Path oldLogDir;
|
||||
|
||||
public interface Reader {
|
||||
|
||||
|
@ -255,8 +256,8 @@ public class HLog implements HConstants, Syncable {
|
|||
* @param listener
|
||||
* @throws IOException
|
||||
*/
|
||||
public HLog(final FileSystem fs, final Path dir, final Configuration conf,
|
||||
final LogRollListener listener)
|
||||
public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
|
||||
final Configuration conf, final LogRollListener listener)
|
||||
throws IOException {
|
||||
super();
|
||||
this.fs = fs;
|
||||
|
@ -276,6 +277,10 @@ public class HLog implements HConstants, Syncable {
|
|||
throw new IOException("Target HLog directory already exists: " + dir);
|
||||
}
|
||||
fs.mkdirs(dir);
|
||||
this.oldLogDir = oldLogDir;
|
||||
if(!fs.exists(oldLogDir)) {
|
||||
fs.mkdirs(this.oldLogDir);
|
||||
}
|
||||
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
|
||||
this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
|
||||
LOG.info("HLog configuration: blocksize=" + this.blocksize +
|
||||
|
@ -370,7 +375,7 @@ public class HLog implements HConstants, Syncable {
|
|||
// flushed (and removed from the lastSeqWritten map). Means can
|
||||
// remove all but currently open log file.
|
||||
for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
|
||||
deleteLogFile(e.getValue(), e.getKey());
|
||||
archiveLogFile(e.getValue(), e.getKey());
|
||||
}
|
||||
this.outputfiles.clear();
|
||||
} else {
|
||||
|
@ -459,7 +464,7 @@ public class HLog implements HConstants, Syncable {
|
|||
" from region " + Bytes.toString(oldestRegion));
|
||||
}
|
||||
for (Long seq : sequenceNumbers) {
|
||||
deleteLogFile(this.outputfiles.remove(seq), seq);
|
||||
archiveLogFile(this.outputfiles.remove(seq), seq);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -552,10 +557,12 @@ public class HLog implements HConstants, Syncable {
|
|||
return oldFile;
|
||||
}
|
||||
|
||||
private void deleteLogFile(final Path p, final Long seqno) throws IOException {
|
||||
LOG.info("removing old hlog file " + FSUtils.getPath(p) +
|
||||
" whose highest sequence/edit id is " + seqno);
|
||||
this.fs.delete(p, true);
|
||||
private void archiveLogFile(final Path p, final Long seqno) throws IOException {
|
||||
Path newPath = getHLogArchivePath(this.oldLogDir, p);
|
||||
LOG.info("moving old hlog file " + FSUtils.getPath(p) +
|
||||
" whose highest sequence/edit id is " + seqno + " to " +
|
||||
FSUtils.getPath(newPath));
|
||||
this.fs.rename(p, newPath);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -576,6 +583,13 @@ public class HLog implements HConstants, Syncable {
|
|||
*/
|
||||
public void closeAndDelete() throws IOException {
|
||||
close();
|
||||
FileStatus[] files = fs.listStatus(this.dir);
|
||||
for(FileStatus file : files) {
|
||||
fs.rename(file.getPath(),
|
||||
getHLogArchivePath(this.oldLogDir, file.getPath()));
|
||||
}
|
||||
LOG.debug("Moved " + files.length + " log files to " +
|
||||
FSUtils.getPath(this.oldLogDir));
|
||||
fs.delete(dir, true);
|
||||
}
|
||||
|
||||
|
@ -991,13 +1005,15 @@ public class HLog implements HConstants, Syncable {
|
|||
* @param rootDir qualified root directory of the HBase instance
|
||||
* @param srcDir Directory of log files to split: e.g.
|
||||
* <code>${ROOTDIR}/log_HOST_PORT</code>
|
||||
* @param oldLogDir
|
||||
* @param fs FileSystem
|
||||
* @param conf HBaseConfiguration
|
||||
* @throws IOException
|
||||
*/
|
||||
public static List<Path> splitLog(final Path rootDir, final Path srcDir,
|
||||
final FileSystem fs, final Configuration conf)
|
||||
throws IOException {
|
||||
Path oldLogDir, final FileSystem fs, final Configuration conf)
|
||||
throws IOException {
|
||||
|
||||
long millis = System.currentTimeMillis();
|
||||
List<Path> splits = null;
|
||||
if (!fs.exists(srcDir)) {
|
||||
|
@ -1011,8 +1027,17 @@ public class HLog implements HConstants, Syncable {
|
|||
}
|
||||
LOG.info("Splitting " + logfiles.length + " hlog(s) in " +
|
||||
srcDir.toString());
|
||||
splits = splitLog(rootDir, logfiles, fs, conf);
|
||||
splits = splitLog(rootDir, oldLogDir, logfiles, fs, conf);
|
||||
try {
|
||||
FileStatus[] files = fs.listStatus(srcDir);
|
||||
for(FileStatus file : files) {
|
||||
Path newPath = getHLogArchivePath(oldLogDir, file.getPath());
|
||||
LOG.debug("Moving " + FSUtils.getPath(file.getPath()) + " to " +
|
||||
FSUtils.getPath(newPath));
|
||||
fs.rename(file.getPath(), newPath);
|
||||
}
|
||||
LOG.debug("Moved " + files.length + " log files to " +
|
||||
FSUtils.getPath(oldLogDir));
|
||||
fs.delete(srcDir, true);
|
||||
} catch (IOException e) {
|
||||
e = RemoteExceptionHandler.checkIOException(e);
|
||||
|
@ -1062,9 +1087,8 @@ public class HLog implements HConstants, Syncable {
|
|||
* @return List of splits made.
|
||||
*/
|
||||
private static List<Path> splitLog(final Path rootDir,
|
||||
final FileStatus [] logfiles, final FileSystem fs,
|
||||
final Configuration conf)
|
||||
throws IOException {
|
||||
Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs,
|
||||
final Configuration conf) throws IOException {
|
||||
final Map<byte [], WriterAndPath> logWriters =
|
||||
new TreeMap<byte [], WriterAndPath>(Bytes.BYTES_COMPARATOR);
|
||||
List<Path> splits = null;
|
||||
|
@ -1139,12 +1163,13 @@ public class HLog implements HConstants, Syncable {
|
|||
} catch (IOException e) {
|
||||
LOG.warn("Close in finally threw exception -- continuing", e);
|
||||
}
|
||||
// Delete the input file now so we do not replay edits. We could
|
||||
// Archive the input file now so we do not replay edits. We could
|
||||
// have gotten here because of an exception. If so, probably
|
||||
// nothing we can do about it. Replaying it, it could work but we
|
||||
// could be stuck replaying for ever. Just continue though we
|
||||
// could have lost some edits.
|
||||
fs.delete(logfiles[i].getPath(), true);
|
||||
fs.rename(logfiles[i].getPath(),
|
||||
getHLogArchivePath(oldLogDir, logfiles[i].getPath()));
|
||||
}
|
||||
}
|
||||
ExecutorService threadPool =
|
||||
|
@ -1342,6 +1367,12 @@ public class HLog implements HConstants, Syncable {
|
|||
return dirName.toString();
|
||||
}
|
||||
|
||||
// We create a new file name with a ts in front of it to make sure we almost
|
||||
// certainly don't have a file name conflict.
|
||||
private static Path getHLogArchivePath(Path oldLogDir, Path p) {
|
||||
return new Path(oldLogDir, System.currentTimeMillis() + "." + p.getName());
|
||||
}
|
||||
|
||||
private static void usage() {
|
||||
System.err.println("Usage: java org.apache.hbase.HLog" +
|
||||
" {--dump <logfile>... | --split <logdir>...}");
|
||||
|
@ -1372,6 +1403,7 @@ public class HLog implements HConstants, Syncable {
|
|||
Configuration conf = HBaseConfiguration.create();
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
Path baseDir = new Path(conf.get(HBASE_DIR));
|
||||
Path oldLogDir = new Path(baseDir, HREGION_OLDLOGDIR_NAME);
|
||||
for (int i = 1; i < args.length; i++) {
|
||||
Path logPath = new Path(args[i]);
|
||||
if (!fs.exists(logPath)) {
|
||||
|
@ -1394,7 +1426,7 @@ public class HLog implements HConstants, Syncable {
|
|||
if (!fs.getFileStatus(logPath).isDir()) {
|
||||
throw new IOException(args[i] + " is not a directory");
|
||||
}
|
||||
splitLog(baseDir, logPath, fs, conf);
|
||||
splitLog(baseDir, logPath, oldLogDir, fs, conf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -96,7 +96,9 @@ public class MetaUtils {
|
|||
if (this.log == null) {
|
||||
Path logdir = new Path(this.fs.getHomeDirectory(),
|
||||
HConstants.HREGION_LOGDIR_NAME + "_" + System.currentTimeMillis());
|
||||
this.log = new HLog(this.fs, logdir, this.conf, null);
|
||||
Path oldLogDir = new Path(this.fs.getHomeDirectory(),
|
||||
HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
this.log = new HLog(this.fs, logdir, oldLogDir, this.conf, null);
|
||||
}
|
||||
return this.log;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
/**
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class TestOldLogsCleaner {
|
||||
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLogCleaning() throws Exception{
|
||||
Configuration c = TEST_UTIL.getConfiguration();
|
||||
Path oldLogDir = new Path(TEST_UTIL.getTestDir(),
|
||||
HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
|
||||
FileSystem fs = FileSystem.get(c);
|
||||
AtomicBoolean stop = new AtomicBoolean(false);
|
||||
OldLogsCleaner cleaner = new OldLogsCleaner(1000, stop,c, fs, oldLogDir);
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
fs.delete(oldLogDir, true);
|
||||
fs.mkdirs(oldLogDir);
|
||||
fs.createNewFile(new Path(oldLogDir, "a"));
|
||||
fs.createNewFile(new Path(oldLogDir, "1.hlog.dat.a"));
|
||||
fs.createNewFile(new Path(oldLogDir, "1.hlog.dat." + now));
|
||||
for(int i = 0; i < 30; i++) {
|
||||
fs.createNewFile(new Path(oldLogDir, i + ".hlog.dat." +(now - 6000000)));
|
||||
}
|
||||
fs.createNewFile(new Path(oldLogDir, "a.hlog.dat." +(now + 10000)));
|
||||
|
||||
assertEquals(34, fs.listStatus(oldLogDir).length);
|
||||
|
||||
cleaner.chore();
|
||||
|
||||
assertEquals(14, fs.listStatus(oldLogDir).length);
|
||||
|
||||
cleaner.chore();
|
||||
|
||||
assertEquals(1, fs.listStatus(oldLogDir).length);
|
||||
}
|
||||
|
||||
}
|
|
@ -5,12 +5,13 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
|
@ -72,6 +73,7 @@ public class TestStore extends TestCase {
|
|||
//Setting up a Store
|
||||
Path basedir = new Path(DIR+methodName);
|
||||
Path logdir = new Path(DIR+methodName+"/logs");
|
||||
Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||
HBaseConfiguration conf = new HBaseConfiguration();
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
|
@ -83,7 +85,7 @@ public class TestStore extends TestCase {
|
|||
HTableDescriptor htd = new HTableDescriptor(table);
|
||||
htd.addFamily(hcd);
|
||||
HRegionInfo info = new HRegionInfo(htd, null, null, false);
|
||||
HLog hlog = new HLog(fs, logdir, conf, null);
|
||||
HLog hlog = new HLog(fs, logdir, oldLogDir, conf, null);
|
||||
HRegion region = new HRegion(basedir, hlog, fs, conf, info, null);
|
||||
|
||||
store = new Store(basedir, region, hcd, fs, reconstructionLog, conf,
|
||||
|
|
|
@ -95,7 +95,9 @@ public class TestStoreReconstruction {
|
|||
HTableDescriptor htd = new HTableDescriptor(TABLE);
|
||||
htd.addFamily(hcd);
|
||||
HRegionInfo info = new HRegionInfo(htd, null, null, false);
|
||||
HLog log = new HLog(cluster.getFileSystem(), this.dir,conf, null);
|
||||
Path oldLogDir = new Path(this.dir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
HLog log = new HLog(cluster.getFileSystem(),
|
||||
this.dir, oldLogDir, conf, null);
|
||||
HRegion region = new HRegion(dir, log,
|
||||
cluster.getFileSystem(),conf, info, null);
|
||||
List<KeyValue> result = new ArrayList<KeyValue>();
|
||||
|
@ -132,7 +134,7 @@ public class TestStoreReconstruction {
|
|||
|
||||
List<Path> splits =
|
||||
HLog.splitLog(new Path(conf.get(HConstants.HBASE_DIR)),
|
||||
this.dir, cluster.getFileSystem(),conf);
|
||||
this.dir, oldLogDir, cluster.getFileSystem(), conf);
|
||||
|
||||
// Split should generate only 1 file since there's only 1 region
|
||||
assertTrue(splits.size() == 1);
|
||||
|
|
|
@ -39,6 +39,7 @@ import java.util.Map;
|
|||
/** JUnit test case for HLog */
|
||||
public class TestHLog extends HBaseTestCase implements HConstants {
|
||||
private Path dir;
|
||||
private Path oldLogDir;
|
||||
private MiniDFSCluster cluster;
|
||||
|
||||
@Override
|
||||
|
@ -55,6 +56,8 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
|||
if (fs.exists(dir)) {
|
||||
fs.delete(dir, true);
|
||||
}
|
||||
this.oldLogDir = new Path("/hbase", HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -75,7 +78,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
|||
|
||||
final byte [] tableName = Bytes.toBytes(getName());
|
||||
final byte [] rowName = tableName;
|
||||
HLog log = new HLog(this.fs, this.dir, this.conf, null);
|
||||
HLog log = new HLog(this.fs, this.dir, this.oldLogDir, this.conf, null);
|
||||
final int howmany = 3;
|
||||
HRegionInfo[] infos = new HRegionInfo[3];
|
||||
for(int i = 0; i < howmany; i++) {
|
||||
|
@ -103,7 +106,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
|||
log.rollWriter();
|
||||
}
|
||||
List<Path> splits =
|
||||
HLog.splitLog(this.testDir, this.dir, this.fs, this.conf);
|
||||
HLog.splitLog(this.testDir, this.dir, this.oldLogDir, this.fs, this.conf);
|
||||
verifySplits(splits, howmany);
|
||||
log = null;
|
||||
} finally {
|
||||
|
@ -132,7 +135,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
|||
out.close();
|
||||
in.close();
|
||||
Path subdir = new Path(this.dir, "hlogdir");
|
||||
HLog wal = new HLog(this.fs, subdir, this.conf, null);
|
||||
HLog wal = new HLog(this.fs, subdir, this.oldLogDir, this.conf, null);
|
||||
final int total = 20;
|
||||
|
||||
HRegionInfo info = new HRegionInfo(new HTableDescriptor(bytes),
|
||||
|
@ -261,7 +264,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
|||
final byte [] tableName = Bytes.toBytes("tablename");
|
||||
final byte [] row = Bytes.toBytes("row");
|
||||
HLog.Reader reader = null;
|
||||
HLog log = new HLog(fs, dir, this.conf, null);
|
||||
HLog log = new HLog(fs, dir, this.oldLogDir, this.conf, null);
|
||||
try {
|
||||
// Write columns named 1, 2, 3, etc. and then values of single byte
|
||||
// 1, 2, 3...
|
||||
|
|
|
@ -248,7 +248,8 @@ public class TestMergeTool extends HBaseTestCase {
|
|||
Path logPath = new Path("/tmp", HConstants.HREGION_LOGDIR_NAME + "_" +
|
||||
System.currentTimeMillis());
|
||||
LOG.info("Creating log " + logPath.toString());
|
||||
HLog log = new HLog(this.fs, logPath, this.conf, null);
|
||||
Path oldLogDir = new Path("/tmp", HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
HLog log = new HLog(this.fs, logPath, oldLogDir, this.conf, null);
|
||||
try {
|
||||
// Merge Region 0 and Region 1
|
||||
HRegion merged = mergeAndVerify("merging regions 0 and 1",
|
||||
|
|
Loading…
Reference in New Issue