HBASE-2580 Make the hlog file names unique
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@947127 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2d6804b03e
commit
4ae7a64206
|
@ -626,6 +626,7 @@ Release 0.21.0 - Unreleased
|
||||||
HBASE-2577 Remove 'core' maven module; move core up a level
|
HBASE-2577 Remove 'core' maven module; move core up a level
|
||||||
HBASE-2587 Coral where tests write data when running and make sure clean
|
HBASE-2587 Coral where tests write data when running and make sure clean
|
||||||
target removes all written
|
target removes all written
|
||||||
|
HBASE-2580 Make the hlog file names unique
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
HBASE-1961 HBase EC2 scripts
|
HBASE-1961 HBase EC2 scripts
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Chore;
|
import org.apache.hadoop.hbase.Chore;
|
||||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -48,8 +49,6 @@ public class OldLogsCleaner extends Chore {
|
||||||
private final Path oldLogDir;
|
private final Path oldLogDir;
|
||||||
private final LogCleanerDelegate logCleaner;
|
private final LogCleanerDelegate logCleaner;
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
// We expect a file looking like hlog.dat.ts
|
|
||||||
private final Pattern pattern = Pattern.compile("\\d*\\.hlog\\.dat\\.\\d*");
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -92,7 +91,7 @@ public class OldLogsCleaner extends Chore {
|
||||||
int nbDeletedLog = 0;
|
int nbDeletedLog = 0;
|
||||||
for (FileStatus file : files) {
|
for (FileStatus file : files) {
|
||||||
Path filePath = file.getPath();
|
Path filePath = file.getPath();
|
||||||
if (pattern.matcher(filePath.getName()).matches()) {
|
if (HLog.validateHLogFilename(filePath.getName())) {
|
||||||
if (logCleaner.isLogDeletable(filePath) ) {
|
if (logCleaner.isLogDeletable(filePath) ) {
|
||||||
this.fs.delete(filePath, true);
|
this.fs.delete(filePath, true);
|
||||||
nbDeletedLog++;
|
nbDeletedLog++;
|
||||||
|
|
|
@ -40,12 +40,13 @@ public class TimeToLiveLogCleaner implements LogCleanerDelegate {
|
||||||
public boolean isLogDeletable(Path filePath) {
|
public boolean isLogDeletable(Path filePath) {
|
||||||
long time = 0;
|
long time = 0;
|
||||||
long currentTime = System.currentTimeMillis();
|
long currentTime = System.currentTimeMillis();
|
||||||
System.out.println(filePath.getName());
|
|
||||||
String[] parts = filePath.getName().split("\\.");
|
String[] parts = filePath.getName().split("\\.");
|
||||||
try {
|
try {
|
||||||
time = Long.parseLong(parts[3]);
|
time = Long.parseLong(parts[parts.length-1]);
|
||||||
} catch (NumberFormatException e) {
|
} catch (NumberFormatException e) {
|
||||||
e.printStackTrace();
|
LOG.error("Unable to parse the timestamp in " + filePath.getName() +
|
||||||
|
", deleting it since it's invalid and may not be a hlog", e);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
long life = currentTime - time;
|
long life = currentTime - time;
|
||||||
if (life < 0) {
|
if (life < 0) {
|
||||||
|
|
|
@ -939,7 +939,8 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
||||||
|
|
||||||
// instantiate
|
// instantiate
|
||||||
protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
|
protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
|
||||||
HLog newlog = new HLog(fs, logdir, oldLogDir, conf, hlogRoller);
|
HLog newlog = new HLog(fs, logdir, oldLogDir, conf, hlogRoller, null,
|
||||||
|
serverInfo.getServerAddress().toString());
|
||||||
return newlog;
|
return newlog;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
|
import java.net.URLEncoder;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
@ -45,6 +46,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.locks.Condition;
|
import java.util.concurrent.locks.Condition;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -123,6 +125,7 @@ public class HLog implements HConstants, Syncable {
|
||||||
private final long optionalFlushInterval;
|
private final long optionalFlushInterval;
|
||||||
private final long blocksize;
|
private final long blocksize;
|
||||||
private final int flushlogentries;
|
private final int flushlogentries;
|
||||||
|
private final String prefix;
|
||||||
private final AtomicInteger unflushedEntries = new AtomicInteger(0);
|
private final AtomicInteger unflushedEntries = new AtomicInteger(0);
|
||||||
private final Path oldLogDir;
|
private final Path oldLogDir;
|
||||||
private final List<LogActionsListener> actionListeners =
|
private final List<LogActionsListener> actionListeners =
|
||||||
|
@ -209,6 +212,11 @@ public class HLog implements HConstants, Syncable {
|
||||||
*/
|
*/
|
||||||
private final LogSyncer logSyncerThread;
|
private final LogSyncer logSyncerThread;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pattern used to validate a HLog file name
|
||||||
|
*/
|
||||||
|
private static final Pattern pattern = Pattern.compile(".*\\.\\d*");
|
||||||
|
|
||||||
static byte [] COMPLETE_CACHE_FLUSH;
|
static byte [] COMPLETE_CACHE_FLUSH;
|
||||||
static {
|
static {
|
||||||
try {
|
try {
|
||||||
|
@ -262,7 +270,7 @@ public class HLog implements HConstants, Syncable {
|
||||||
public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
|
public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
|
||||||
final Configuration conf, final LogRollListener listener)
|
final Configuration conf, final LogRollListener listener)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this(fs, dir, oldLogDir, conf, listener, null);
|
this(fs, dir, oldLogDir, conf, listener, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -278,11 +286,14 @@ public class HLog implements HConstants, Syncable {
|
||||||
* @param conf configuration to use
|
* @param conf configuration to use
|
||||||
* @param listener listerner used to request log rolls
|
* @param listener listerner used to request log rolls
|
||||||
* @param actionListener optional listener for hlog actions like archiving
|
* @param actionListener optional listener for hlog actions like archiving
|
||||||
|
* @param prefix should always be hostname and port in distributed env and
|
||||||
|
* it will be URL encoded before being used.
|
||||||
|
* If prefix is null, "hlog" will be used
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
|
public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
|
||||||
final Configuration conf, final LogRollListener listener,
|
final Configuration conf, final LogRollListener listener,
|
||||||
final LogActionsListener actionListener)
|
final LogActionsListener actionListener, final String prefix)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
super();
|
super();
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
|
@ -316,6 +327,9 @@ public class HLog implements HConstants, Syncable {
|
||||||
if (actionListener != null) {
|
if (actionListener != null) {
|
||||||
addLogActionsListerner(actionListener);
|
addLogActionsListerner(actionListener);
|
||||||
}
|
}
|
||||||
|
// If prefix is null||empty then just name it hlog
|
||||||
|
this.prefix = prefix == null || prefix.isEmpty() ?
|
||||||
|
"hlog" : URLEncoder.encode(prefix, "UTF8");
|
||||||
// rollWriter sets this.hdfs_out if it can.
|
// rollWriter sets this.hdfs_out if it can.
|
||||||
rollWriter();
|
rollWriter();
|
||||||
|
|
||||||
|
@ -414,7 +428,7 @@ public class HLog implements HConstants, Syncable {
|
||||||
// Clean up current writer.
|
// Clean up current writer.
|
||||||
Path oldFile = cleanupCurrentWriter(this.filenum);
|
Path oldFile = cleanupCurrentWriter(this.filenum);
|
||||||
this.filenum = System.currentTimeMillis();
|
this.filenum = System.currentTimeMillis();
|
||||||
Path newPath = computeFilename(this.filenum);
|
Path newPath = computeFilename();
|
||||||
this.writer = createWriter(fs, newPath, HBaseConfiguration.create(conf));
|
this.writer = createWriter(fs, newPath, HBaseConfiguration.create(conf));
|
||||||
this.initialReplication = fs.getFileStatus(newPath).getReplication();
|
this.initialReplication = fs.getFileStatus(newPath).getReplication();
|
||||||
|
|
||||||
|
@ -627,7 +641,7 @@ public class HLog implements HConstants, Syncable {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
if (currentfilenum >= 0) {
|
if (currentfilenum >= 0) {
|
||||||
oldFile = computeFilename(currentfilenum);
|
oldFile = computeFilename();
|
||||||
this.outputfiles.put(Long.valueOf(this.logSeqNum.get() - 1), oldFile);
|
this.outputfiles.put(Long.valueOf(this.logSeqNum.get() - 1), oldFile);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -650,12 +664,13 @@ public class HLog implements HConstants, Syncable {
|
||||||
/**
|
/**
|
||||||
* This is a convenience method that computes a new filename with a given
|
* This is a convenience method that computes a new filename with a given
|
||||||
* file-number.
|
* file-number.
|
||||||
* @param fn
|
|
||||||
* @return Path
|
* @return Path
|
||||||
*/
|
*/
|
||||||
public Path computeFilename(final long fn) {
|
protected Path computeFilename() {
|
||||||
if (fn < 0) return null;
|
if (filenum < 0) {
|
||||||
return new Path(dir, HLOG_DATFILE + fn);
|
throw new RuntimeException("hlog file number can't be < 0");
|
||||||
|
}
|
||||||
|
return new Path(dir, prefix + "." + filenum);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1664,8 +1679,12 @@ public class HLog implements HConstants, Syncable {
|
||||||
return dirName.toString();
|
return dirName.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean validateHLogFilename(String filename) {
|
||||||
|
return pattern.matcher(filename).matches();
|
||||||
|
}
|
||||||
|
|
||||||
private static Path getHLogArchivePath(Path oldLogDir, Path p) {
|
private static Path getHLogArchivePath(Path oldLogDir, Path p) {
|
||||||
return new Path(oldLogDir, System.currentTimeMillis() + "." + p.getName());
|
return new Path(oldLogDir, p.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void usage() {
|
private static void usage() {
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -34,6 +35,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
import java.net.URLEncoder;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
public class TestOldLogsCleaner {
|
public class TestOldLogsCleaner {
|
||||||
|
@ -74,32 +76,41 @@ public class TestOldLogsCleaner {
|
||||||
Configuration c = TEST_UTIL.getConfiguration();
|
Configuration c = TEST_UTIL.getConfiguration();
|
||||||
Path oldLogDir = new Path(TEST_UTIL.getTestDir(),
|
Path oldLogDir = new Path(TEST_UTIL.getTestDir(),
|
||||||
HConstants.HREGION_OLDLOGDIR_NAME);
|
HConstants.HREGION_OLDLOGDIR_NAME);
|
||||||
|
String fakeMachineName = URLEncoder.encode("regionserver:60020", "UTF8");
|
||||||
|
|
||||||
FileSystem fs = FileSystem.get(c);
|
FileSystem fs = FileSystem.get(c);
|
||||||
AtomicBoolean stop = new AtomicBoolean(false);
|
AtomicBoolean stop = new AtomicBoolean(false);
|
||||||
OldLogsCleaner cleaner = new OldLogsCleaner(1000, stop,c, fs, oldLogDir);
|
OldLogsCleaner cleaner = new OldLogsCleaner(1000, stop,c, fs, oldLogDir);
|
||||||
|
|
||||||
|
// Create 2 invalid files, 1 "recent" file, 1 very new file and 30 old files
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
fs.delete(oldLogDir, true);
|
fs.delete(oldLogDir, true);
|
||||||
fs.mkdirs(oldLogDir);
|
fs.mkdirs(oldLogDir);
|
||||||
fs.createNewFile(new Path(oldLogDir, "a"));
|
fs.createNewFile(new Path(oldLogDir, "a"));
|
||||||
fs.createNewFile(new Path(oldLogDir, "1.hlog.dat.a"));
|
fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + "a"));
|
||||||
fs.createNewFile(new Path(oldLogDir, "1.hlog.dat." + now));
|
fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + now));
|
||||||
for(int i = 0; i < 30; i++) {
|
System.out.println("Now is: " + now);
|
||||||
fs.createNewFile(new Path(oldLogDir, 1 + "hlog.dat." +
|
for (int i = 0; i < 30; i++) {
|
||||||
(now - 6000000 - i)));
|
fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now - 6000000 - i) ));
|
||||||
}
|
}
|
||||||
fs.createNewFile(new Path(oldLogDir, "a.hlog.dat." + (now + 10000)));
|
for (FileStatus stat : fs.listStatus(oldLogDir)) {
|
||||||
|
System.out.println(stat.getPath().toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now + 10000) ));
|
||||||
|
|
||||||
assertEquals(34, fs.listStatus(oldLogDir).length);
|
assertEquals(34, fs.listStatus(oldLogDir).length);
|
||||||
|
|
||||||
|
// This will take care of 20 old log files (default max we can delete)
|
||||||
cleaner.chore();
|
cleaner.chore();
|
||||||
|
|
||||||
assertEquals(14, fs.listStatus(oldLogDir).length);
|
assertEquals(14, fs.listStatus(oldLogDir).length);
|
||||||
|
|
||||||
|
// We will delete all remaining log files and those that are invalid
|
||||||
cleaner.chore();
|
cleaner.chore();
|
||||||
|
|
||||||
assertEquals(1, fs.listStatus(oldLogDir).length);
|
// We end up with the current log file and a newer one
|
||||||
|
assertEquals(2, fs.listStatus(oldLogDir).length);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -150,7 +150,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
||||||
// gives you EOFE.
|
// gives you EOFE.
|
||||||
wal.sync();
|
wal.sync();
|
||||||
// Open a Reader.
|
// Open a Reader.
|
||||||
Path walPath = wal.computeFilename(wal.getFilenum());
|
Path walPath = wal.computeFilename();
|
||||||
HLog.Reader reader = HLog.getReader(fs, walPath, conf);
|
HLog.Reader reader = HLog.getReader(fs, walPath, conf);
|
||||||
int count = 0;
|
int count = 0;
|
||||||
HLog.Entry entry = new HLog.Entry();
|
HLog.Entry entry = new HLog.Entry();
|
||||||
|
@ -281,7 +281,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
||||||
long logSeqId = log.startCacheFlush();
|
long logSeqId = log.startCacheFlush();
|
||||||
log.completeCacheFlush(regionName, tableName, logSeqId, info.isMetaRegion());
|
log.completeCacheFlush(regionName, tableName, logSeqId, info.isMetaRegion());
|
||||||
log.close();
|
log.close();
|
||||||
Path filename = log.computeFilename(log.getFilenum());
|
Path filename = log.computeFilename();
|
||||||
log = null;
|
log = null;
|
||||||
// Now open a reader on the log and assert append worked.
|
// Now open a reader on the log and assert append worked.
|
||||||
reader = HLog.getReader(fs, filename, conf);
|
reader = HLog.getReader(fs, filename, conf);
|
||||||
|
@ -349,7 +349,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
||||||
long logSeqId = log.startCacheFlush();
|
long logSeqId = log.startCacheFlush();
|
||||||
log.completeCacheFlush(hri.getRegionName(), tableName, logSeqId, false);
|
log.completeCacheFlush(hri.getRegionName(), tableName, logSeqId, false);
|
||||||
log.close();
|
log.close();
|
||||||
Path filename = log.computeFilename(log.getFilenum());
|
Path filename = log.computeFilename();
|
||||||
log = null;
|
log = null;
|
||||||
// Now open a reader on the log and assert append worked.
|
// Now open a reader on the log and assert append worked.
|
||||||
reader = HLog.getReader(fs, filename, conf);
|
reader = HLog.getReader(fs, filename, conf);
|
||||||
|
|
|
@ -84,7 +84,7 @@ public class TestLogActionsListener {
|
||||||
public void testActionListener() throws Exception {
|
public void testActionListener() throws Exception {
|
||||||
DummyLogActionsListener list = new DummyLogActionsListener();
|
DummyLogActionsListener list = new DummyLogActionsListener();
|
||||||
DummyLogActionsListener laterList = new DummyLogActionsListener();
|
DummyLogActionsListener laterList = new DummyLogActionsListener();
|
||||||
HLog hlog = new HLog(fs, logDir, oldLogDir, conf, null, list);
|
HLog hlog = new HLog(fs, logDir, oldLogDir, conf, null, list, null);
|
||||||
HRegionInfo hri = new HRegionInfo(new HTableDescriptor(SOME_BYTES),
|
HRegionInfo hri = new HRegionInfo(new HTableDescriptor(SOME_BYTES),
|
||||||
SOME_BYTES, SOME_BYTES, false);
|
SOME_BYTES, SOME_BYTES, false);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue