HBASE-8962 Clean up code and remove regular log splitting

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1504647 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jxiang 2013-07-18 20:42:35 +00:00
parent b5ff002a1a
commit ac924da7af
15 changed files with 284 additions and 781 deletions

View File

@ -25,6 +25,8 @@ import java.io.IOException;
@InterfaceAudience.Private @InterfaceAudience.Private
public class OrphanHLogAfterSplitException extends IOException { public class OrphanHLogAfterSplitException extends IOException {
private static final long serialVersionUID = -4363805979687710634L;
/** /**
* Create this exception without a message * Create this exception without a message
*/ */

View File

@ -728,10 +728,6 @@ public final class HConstants {
public static final String LOCALHOST_IP = "127.0.0.1"; public static final String LOCALHOST_IP = "127.0.0.1";
/** Conf key that enables distributed log splitting */
public static final String DISTRIBUTED_LOG_SPLITTING_KEY =
"hbase.master.distributed.log.splitting";
/** Conf key that enables unflushed WAL edits directly being replayed to region servers */ /** Conf key that enables unflushed WAL edits directly being replayed to region servers */
public static final String DISTRIBUTED_LOG_REPLAY_KEY = "hbase.master.distributed.log.replay"; public static final String DISTRIBUTED_LOG_REPLAY_KEY = "hbase.master.distributed.log.replay";
public static final boolean DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG = false; public static final boolean DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG = false;

View File

@ -48,11 +48,9 @@ import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.InvalidFamilyOperationException; import org.apache.hadoop.hbase.exceptions.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException;
import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -89,7 +87,6 @@ public class MasterFileSystem {
// create the split log lock // create the split log lock
final Lock splitLogLock = new ReentrantLock(); final Lock splitLogLock = new ReentrantLock();
final boolean distributedLogReplay; final boolean distributedLogReplay;
final boolean distributedLogSplitting;
final SplitLogManager splitLogManager; final SplitLogManager splitLogManager;
private final MasterServices services; private final MasterServices services;
@ -125,11 +122,7 @@ public class MasterFileSystem {
// make sure the fs has the same conf // make sure the fs has the same conf
fs.setConf(conf); fs.setConf(conf);
this.splitLogManager = new SplitLogManager(master.getZooKeeper(), master.getConfiguration(), this.splitLogManager = new SplitLogManager(master.getZooKeeper(), master.getConfiguration(),
master, services, master.getServerName()); master, services, master.getServerName(), masterRecovery);
this.distributedLogSplitting = conf.getBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true);
if (this.distributedLogSplitting) {
this.splitLogManager.finishInitialization(masterRecovery);
}
this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG); HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
// setup the filesystem variable // setup the filesystem variable
@ -305,16 +298,7 @@ public class MasterFileSystem {
* @throws IOException * @throws IOException
*/ */
public void splitMetaLog(final Set<ServerName> serverNames) throws IOException { public void splitMetaLog(final Set<ServerName> serverNames) throws IOException {
long splitTime = 0, splitLogSize = 0; splitLog(serverNames, META_FILTER);
List<Path> logDirs = getLogDirs(serverNames);
splitLogManager.handleDeadWorkers(serverNames);
splitTime = EnvironmentEdgeManager.currentTimeMillis();
splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, META_FILTER);
splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
if (this.metricsMaster != null) {
this.metricsMaster.addMetaWALSplit(splitTime, splitLogSize);
}
} }
private List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException { private List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException {
@ -419,40 +403,13 @@ public class MasterFileSystem {
long splitTime = 0, splitLogSize = 0; long splitTime = 0, splitLogSize = 0;
List<Path> logDirs = getLogDirs(serverNames); List<Path> logDirs = getLogDirs(serverNames);
if (distributedLogSplitting) { splitLogManager.handleDeadWorkers(serverNames);
splitLogManager.handleDeadWorkers(serverNames); splitTime = EnvironmentEdgeManager.currentTimeMillis();
splitTime = EnvironmentEdgeManager.currentTimeMillis(); splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter);
splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter); splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
} else {
for(Path logDir: logDirs){
// splitLogLock ensures that dead region servers' logs are processed
// one at a time
this.splitLogLock.lock();
try {
HLogSplitter splitter = HLogSplitter.createLogSplitter(conf, rootdir, logDir, oldLogDir,
this.fs);
try {
// If FS is in safe mode, just wait till out of it.
FSUtils.waitOnSafeMode(conf, conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1000));
splitter.splitLog();
} catch (OrphanHLogAfterSplitException e) {
LOG.warn("Retrying splitting because of:", e);
//An HLogSplitter instance can only be used once. Get new instance.
splitter = HLogSplitter.createLogSplitter(conf, rootdir, logDir,
oldLogDir, this.fs);
splitter.splitLog();
}
splitTime = splitter.getTime();
splitLogSize = splitter.getSize();
} finally {
this.splitLogLock.unlock();
}
}
}
if (this.metricsMaster != null) { if (this.metricsMaster != null) {
if (filter == this.META_FILTER) { if (filter == META_FILTER) {
this.metricsMaster.addMetaWALSplit(splitTime, splitLogSize); this.metricsMaster.addMetaWALSplit(splitTime, splitLogSize);
} else { } else {
this.metricsMaster.addSplit(splitTime, splitLogSize); this.metricsMaster.addSplit(splitTime, splitLogSize);
@ -469,6 +426,7 @@ public class MasterFileSystem {
* needed populating the directory with necessary bootup files). * needed populating the directory with necessary bootup files).
* @throws IOException * @throws IOException
*/ */
@SuppressWarnings("deprecation")
private Path checkRootDir(final Path rd, final Configuration c, private Path checkRootDir(final Path rd, final Configuration c,
final FileSystem fs) final FileSystem fs)
throws IOException { throws IOException {

View File

@ -149,19 +149,39 @@ public class SplitLogManager extends ZooKeeperListener {
/** /**
* Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
* Stoppable stopper, MasterServices master, ServerName serverName, TaskFinisher tf)} * Stoppable stopper, MasterServices master, ServerName serverName,
* boolean masterRecovery, TaskFinisher tf)}
* with masterRecovery = false, and tf = null. Used in unit tests.
*
* @param zkw the ZK watcher
* @param conf the HBase configuration
* @param stopper the stoppable in case anything is wrong
* @param master the master services
* @param serverName the master server name
*/
public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
Stoppable stopper, MasterServices master, ServerName serverName) {
this(zkw, conf, stopper, master, serverName, false, null);
}
/**
* Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
* Stoppable stopper, MasterServices master, ServerName serverName,
* boolean masterRecovery, TaskFinisher tf)}
* that provides a task finisher for copying recovered edits to their final destination. * that provides a task finisher for copying recovered edits to their final destination.
* The task finisher has to be robust because it can be arbitrarily restarted or called * The task finisher has to be robust because it can be arbitrarily restarted or called
* multiple times. * multiple times.
* *
* @param zkw * @param zkw the ZK watcher
* @param conf * @param conf the HBase configuration
* @param stopper * @param stopper the stoppable in case anything is wrong
* @param serverName * @param master the master services
* @param serverName the master server name
* @param masterRecovery an indication if the master is in recovery
*/ */
public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf, public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
Stoppable stopper, MasterServices master, ServerName serverName) { Stoppable stopper, MasterServices master, ServerName serverName, boolean masterRecovery) {
this(zkw, conf, stopper, master, serverName, new TaskFinisher() { this(zkw, conf, stopper, master, serverName, masterRecovery, new TaskFinisher() {
@Override @Override
public Status finish(ServerName workerName, String logfile) { public Status finish(ServerName workerName, String logfile) {
try { try {
@ -180,14 +200,17 @@ public class SplitLogManager extends ZooKeeperListener {
* does lookup the orphan tasks in zk but it doesn't block waiting for them * does lookup the orphan tasks in zk but it doesn't block waiting for them
* to be done. * to be done.
* *
* @param zkw * @param zkw the ZK watcher
* @param conf * @param conf the HBase configuration
* @param stopper * @param stopper the stoppable in case anything is wrong
* @param serverName * @param master the master services
* @param serverName the master server name
* @param masterRecovery an indication if the master is in recovery
* @param tf task finisher * @param tf task finisher
*/ */
public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
Stoppable stopper, MasterServices master, ServerName serverName, TaskFinisher tf) { Stoppable stopper, MasterServices master,
ServerName serverName, boolean masterRecovery, TaskFinisher tf) {
super(zkw); super(zkw);
this.taskFinisher = tf; this.taskFinisher = tf;
this.conf = conf; this.conf = conf;
@ -208,9 +231,7 @@ public class SplitLogManager extends ZooKeeperListener {
this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG); HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
LOG.info("distributedLogReplay = " + this.distributedLogReplay); LOG.info("distributedLogReplay = " + this.distributedLogReplay);
}
public void finishInitialization(boolean masterRecovery) {
if (!masterRecovery) { if (!masterRecovery) {
Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName
+ ".splitLogManagerTimeoutMonitor"); + ".splitLogManagerTimeoutMonitor");
@ -1646,12 +1667,4 @@ public class SplitLogManager extends ZooKeeperListener {
return statusMsg; return statusMsg;
} }
} }
/**
* Completes the initialization
*/
public void finishInitialization() {
finishInitialization(false);
}
} }

View File

@ -866,6 +866,7 @@ class FSHLog implements HLog, Syncable {
* @return txid of this transaction * @return txid of this transaction
* @throws IOException * @throws IOException
*/ */
@SuppressWarnings("deprecation")
private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId,
final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore) final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore)
throws IOException { throws IOException {
@ -1342,15 +1343,13 @@ class FSHLog implements HLog, Syncable {
if (!fs.exists(p)) { if (!fs.exists(p)) {
throw new FileNotFoundException(p.toString()); throw new FileNotFoundException(p.toString());
} }
final Path baseDir = FSUtils.getRootDir(conf);
final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
if (!fs.getFileStatus(p).isDir()) { if (!fs.getFileStatus(p).isDir()) {
throw new IOException(p + " is not a directory"); throw new IOException(p + " is not a directory");
} }
HLogSplitter logSplitter = HLogSplitter.createLogSplitter( final Path baseDir = FSUtils.getRootDir(conf);
conf, baseDir, p, oldLogDir, fs); final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
logSplitter.splitLog(); HLogSplitter.split(baseDir, p, oldLogDir, fs, conf);
} }
@Override @Override

View File

@ -22,13 +22,9 @@ import java.io.EOFException;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.ConnectException;
import java.text.ParseException; import java.text.ParseException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -39,15 +35,12 @@ import java.util.TreeSet;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -58,7 +51,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
@ -71,13 +63,10 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.exceptions.TableNotFoundException; import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@ -85,7 +74,6 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.LastSequenceId; import org.apache.hadoop.hbase.regionserver.LastSequenceId;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
@ -98,15 +86,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKTable; import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -118,19 +102,10 @@ import com.google.common.collect.Lists;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class HLogSplitter { public class HLogSplitter {
private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl";
static final Log LOG = LogFactory.getLog(HLogSplitter.class); static final Log LOG = LogFactory.getLog(HLogSplitter.class);
private boolean hasSplit = false;
private long splitTime = 0;
private long splitSize = 0;
// Parameters for split process // Parameters for split process
protected final Path rootDir; protected final Path rootDir;
protected final Path srcDir;
protected final Path oldLogDir;
protected final FileSystem fs; protected final FileSystem fs;
protected final Configuration conf; protected final Configuration conf;
@ -173,61 +148,10 @@ public class HLogSplitter {
// Min batch size when replay WAL edits // Min batch size when replay WAL edits
private final int minBatchSize; private final int minBatchSize;
/** HLogSplitter(Configuration conf, Path rootDir,
* Create a new HLogSplitter using the given {@link Configuration} and the FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw) {
* <code>hbase.hlog.splitter.impl</code> property to derived the instance class to use.
* distributedLogReplay won't be enabled by this constructor.
* <p>
* @param conf
* @param rootDir hbase directory
* @param srcDir logs directory
* @param oldLogDir directory where processed logs are archived to
* @param fs FileSystem
* @return New HLogSplitter instance
*/
public static HLogSplitter createLogSplitter(Configuration conf,
final Path rootDir, final Path srcDir,
Path oldLogDir, final FileSystem fs) {
@SuppressWarnings("unchecked")
Class<? extends HLogSplitter> splitterClass = (Class<? extends HLogSplitter>) conf
.getClass(LOG_SPLITTER_IMPL, HLogSplitter.class);
try {
Constructor<? extends HLogSplitter> constructor =
splitterClass.getConstructor(
Configuration.class, // conf
Path.class, // rootDir
Path.class, // srcDir
Path.class, // oldLogDir
FileSystem.class, // fs
LastSequenceId.class);
return constructor.newInstance(conf, rootDir, srcDir, oldLogDir, fs, null);
} catch (IllegalArgumentException e) {
throw new RuntimeException(e);
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
} catch (SecurityException e) {
throw new RuntimeException(e);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
}
public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
Path oldLogDir, FileSystem fs, LastSequenceId idChecker) {
this(conf, rootDir, srcDir, oldLogDir, fs, idChecker, null);
}
public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
Path oldLogDir, FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw) {
this.conf = conf; this.conf = conf;
this.rootDir = rootDir; this.rootDir = rootDir;
this.srcDir = srcDir;
this.oldLogDir = oldLogDir;
this.fs = fs; this.fs = fs;
this.sequenceIdChecker = idChecker; this.sequenceIdChecker = idChecker;
this.watcher = zkw; this.watcher = zkw;
@ -253,198 +177,8 @@ public class HLogSplitter {
} }
/** /**
* Split up a bunch of regionserver commit log files that are no longer being * Splits a HLog file into region's recovered-edits directory.
* written to, into new files, one per region for region to replay on startup. * This is the main entry point for distributed log splitting from SplitLogWorker.
* Delete the old log files when finished.
*
* @throws IOException will throw if corrupted hlogs aren't tolerated
* @return the list of splits
*/
public List<Path> splitLog()
throws IOException {
return splitLog((CountDownLatch) null);
}
/**
* Split up a bunch of regionserver commit log files that are no longer being
* written to, into new files, one per region for region to replay on startup.
* Delete the old log files when finished.
*
* @param latch
* @throws IOException will throw if corrupted hlogs aren't tolerated
* @return the list of splits
*/
public List<Path> splitLog(CountDownLatch latch)
throws IOException {
Preconditions.checkState(!hasSplit,
"An HLogSplitter instance may only be used once");
hasSplit = true;
status = TaskMonitor.get().createStatus(
"Splitting logs in " + srcDir);
long startTime = EnvironmentEdgeManager.currentTimeMillis();
status.setStatus("Determining files to split...");
List<Path> splits = null;
if (!fs.exists(srcDir)) {
// Nothing to do
status.markComplete("No log directory existed to split.");
return splits;
}
FileStatus[] logfiles = fs.listStatus(srcDir);
if (logfiles == null || logfiles.length == 0) {
// Nothing to do
return splits;
}
logAndReport("Splitting " + logfiles.length + " hlog(s) in "
+ srcDir.toString());
splits = splitLog(logfiles, latch);
splitTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
String msg = "hlog file splitting completed in " + splitTime +
" ms for " + srcDir.toString();
status.markComplete(msg);
LOG.info(msg);
return splits;
}
private void logAndReport(String msg) {
status.setStatus(msg);
LOG.info(msg);
}
/**
* @return time that this split took
*/
public long getTime() {
return this.splitTime;
}
/**
* @return aggregate size of hlogs that were split
*/
public long getSize() {
return this.splitSize;
}
/**
* @return a map from encoded region ID to the number of edits written out
* for that region.
*/
Map<byte[], Long> getOutputCounts() {
Preconditions.checkState(hasSplit);
return outputSink.getOutputCounts();
}
/**
* Splits or Replays the HLog edits in the given list of logfiles (that are a mix of edits on
* multiple regions) by region and then splits(or replay when distributedLogReplay is true) them
* per region directories, in batches.
* <p>
* This process is split into multiple threads. In the main thread, we loop through the logs to be
* split. For each log, we:
* <ul>
* <li>Recover it (take and drop HDFS lease) to ensure no other process can write</li>
* <li>Read each edit (see {@link #parseHLog}</li>
* <li>Mark as "processed" or "corrupt" depending on outcome</li>
* </ul>
* <p>
* Each edit is passed into the EntryBuffers instance, which takes care of memory accounting and
* splitting the edits by region.
* <p>
* The OutputSink object then manages N other WriterThreads which pull chunks of edits from
* EntryBuffers and write them to either recovered.edits files or replay them to newly assigned
* region servers directly
* <p>
* After the process is complete, the log files are archived to a separate directory.
*/
private List<Path> splitLog(final FileStatus[] logfiles, CountDownLatch latch)
throws IOException {
List<Path> processedLogs = new ArrayList<Path>(logfiles.length);
List<Path> corruptedLogs = new ArrayList<Path>(logfiles.length);
List<Path> splits;
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true);
countTotalBytes(logfiles);
splitSize = 0;
outputSink.startWriterThreads();
try {
int i = 0;
for (FileStatus log : logfiles) {
Path logPath = log.getPath();
long logLength = log.getLen();
splitSize += logLength;
logAndReport("Splitting hlog " + (i++ + 1) + " of " + logfiles.length
+ ": " + logPath + ", length=" + logLength);
Reader in = null;
try {
//actually, for meta-only hlogs, we don't need to go thru the process
//of parsing and segregating by regions since all the logs are for
//meta only. However, there is a sequence number that can be obtained
//only by parsing.. so we parse for all files currently
//TODO: optimize this part somehow
in = getReader(fs, log, conf, skipErrors, null);
if (in != null) {
parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors);
}
processedLogs.add(logPath);
} catch (CorruptedLogFileException e) {
LOG.info("Got while parsing hlog " + logPath +
". Marking as corrupted", e);
corruptedLogs.add(logPath);
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
LOG.warn("Close log reader threw exception -- continuing", e);
}
}
}
}
status.setStatus("Log splits complete. Checking for orphaned logs.");
if (latch != null) {
try {
latch.await();
} catch (InterruptedException ie) {
LOG.warn("wait for latch interrupted");
Thread.currentThread().interrupt();
}
}
FileStatus[] currFiles = fs.listStatus(srcDir);
if (currFiles.length > processedLogs.size()
+ corruptedLogs.size()) {
throw new OrphanHLogAfterSplitException(
"Discovered orphan hlog after split. Maybe the "
+ "HRegionServer was not dead when we started");
}
} finally {
status.setStatus("Finishing writing output logs and closing down.");
splits = outputSink.finishWritingAndClose();
}
status.setStatus("Archiving logs after completed split");
archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);
return splits;
}
/**
* @return the total size of the passed list of files.
*/
private static long countTotalBytes(FileStatus[] logfiles) {
long ret = 0;
for (FileStatus stat : logfiles) {
ret += stat.getLen();
}
return ret;
}
/**
* Splits a HLog file into region's recovered-edits directory
* <p> * <p>
* If the log file has N regions then N recovered.edits files will be produced. * If the log file has N regions then N recovered.edits files will be produced.
* <p> * <p>
@ -459,34 +193,40 @@ public class HLogSplitter {
* @return false if it is interrupted by the progress-able. * @return false if it is interrupted by the progress-able.
* @throws IOException * @throws IOException
*/ */
static public boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs, public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
ZooKeeperWatcher zkw) ZooKeeperWatcher zkw) throws IOException {
throws IOException { HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw);
HLogSplitter s = new HLogSplitter(conf, rootDir, null, null/* oldLogDir */, fs, idChecker, zkw);
return s.splitLogFile(logfile, reporter); return s.splitLogFile(logfile, reporter);
} }
/** // A wrapper to split one log folder using the method used by distributed
* Splits a HLog file into region's recovered-edits directory // log splitting. Used by tools and unit tests. It should be package private.
* <p> // It is public only because TestWALObserver is in a different package,
* If the log file has N regions then N recovered.edits files will be produced. // which uses this method to to log splitting.
* <p> public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
* @param rootDir FileSystem fs, Configuration conf) throws IOException {
* @param logfile FileStatus[] logfiles = fs.listStatus(logDir);
* @param fs List<Path> splits = new ArrayList<Path>();
* @param conf if (logfiles != null && logfiles.length > 0) {
* @param reporter for (FileStatus logfile: logfiles) {
* @return false if it is interrupted by the progress-able. HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null);
* @throws IOException if (s.splitLogFile(logfile, null)) {
*/ finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
static public boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs, if (s.outputSink.splits != null) {
Configuration conf, CancelableProgressable reporter) splits.addAll(s.outputSink.splits);
throws IOException { }
return HLogSplitter.splitLogFile(rootDir, logfile, fs, conf, reporter, null, null); }
}
}
if (!fs.delete(logDir, true)) {
throw new IOException("Unable to delete src dir: " + logDir);
}
return splits;
} }
public boolean splitLogFile(FileStatus logfile, // The real log splitter. It just splits one log file.
boolean splitLogFile(FileStatus logfile,
CancelableProgressable reporter) throws IOException { CancelableProgressable reporter) throws IOException {
boolean isCorrupted = false; boolean isCorrupted = false;
Preconditions.checkState(status == null); Preconditions.checkState(status == null);
@ -615,31 +355,31 @@ public class HLogSplitter {
* @param conf * @param conf
* @throws IOException * @throws IOException
*/ */
public static void finishSplitLogFile(String logfile, Configuration conf) public static void finishSplitLogFile(String logfile,
throws IOException { Configuration conf) throws IOException {
Path rootdir = FSUtils.getRootDir(conf); Path rootdir = FSUtils.getRootDir(conf);
Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME); Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
finishSplitLogFile(rootdir, oldLogDir, logfile, conf); Path logPath;
}
public static void finishSplitLogFile(Path rootdir, Path oldLogDir,
String logfile, Configuration conf) throws IOException {
List<Path> processedLogs = new ArrayList<Path>();
List<Path> corruptedLogs = new ArrayList<Path>();
FileSystem fs;
fs = rootdir.getFileSystem(conf);
Path logPath = null;
if (FSUtils.isStartingWithPath(rootdir, logfile)) { if (FSUtils.isStartingWithPath(rootdir, logfile)) {
logPath = new Path(logfile); logPath = new Path(logfile);
} else { } else {
logPath = new Path(rootdir, logfile); logPath = new Path(rootdir, logfile);
} }
finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
}
static void finishSplitLogFile(Path rootdir, Path oldLogDir,
Path logPath, Configuration conf) throws IOException {
List<Path> processedLogs = new ArrayList<Path>();
List<Path> corruptedLogs = new ArrayList<Path>();
FileSystem fs;
fs = rootdir.getFileSystem(conf);
if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) { if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
corruptedLogs.add(logPath); corruptedLogs.add(logPath);
} else { } else {
processedLogs.add(logPath); processedLogs.add(logPath);
} }
archiveLogs(null, corruptedLogs, processedLogs, oldLogDir, fs, conf); archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName()); Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
fs.delete(stagingDir, true); fs.delete(stagingDir, true);
} }
@ -657,7 +397,6 @@ public class HLogSplitter {
* @throws IOException * @throws IOException
*/ */
private static void archiveLogs( private static void archiveLogs(
final Path srcDir,
final List<Path> corruptedLogs, final List<Path> corruptedLogs,
final List<Path> processedLogs, final Path oldLogDir, final List<Path> processedLogs, final Path oldLogDir,
final FileSystem fs, final Configuration conf) throws IOException { final FileSystem fs, final Configuration conf) throws IOException {
@ -692,12 +431,6 @@ public class HLogSplitter {
} }
} }
} }
// distributed log splitting removes the srcDir (region's log dir) later
// when all the log files in that srcDir have been successfully processed
if (srcDir != null && !fs.delete(srcDir, true)) {
throw new IOException("Unable to delete src dir: " + srcDir);
}
} }
/** /**
@ -774,38 +507,6 @@ public class HLogSplitter {
return String.format("%019d", seqid); return String.format("%019d", seqid);
} }
/**
* Parse a single hlog and put the edits in entryBuffers
*
* @param in the hlog reader
* @param path the path of the log file
* @param entryBuffers the buffer to hold the parsed edits
* @param fs the file system
* @param conf the configuration
* @param skipErrors indicator if CorruptedLogFileException should be thrown instead of IOException
* @throws IOException
* @throws CorruptedLogFileException if hlog is corrupted
*/
private void parseHLog(final Reader in, Path path,
EntryBuffers entryBuffers, final FileSystem fs,
final Configuration conf, boolean skipErrors)
throws IOException, CorruptedLogFileException {
int editsCount = 0;
try {
Entry entry;
while ((entry = getNextLogLine(in, path, skipErrors)) != null) {
entryBuffers.appendEntry(entry);
editsCount++;
}
} catch (InterruptedException ie) {
IOException t = new InterruptedIOException();
t.initCause(ie);
throw t;
} finally {
LOG.debug("Pushed=" + editsCount + " entries from " + path);
}
}
/** /**
* Create a new {@link Reader} for reading logs to split. * Create a new {@link Reader} for reading logs to split.
* *
@ -823,7 +524,6 @@ public class HLogSplitter {
long length = file.getLen(); long length = file.getLen();
Reader in; Reader in;
// Check for possibly empty file. With appends, currently Hadoop reports a // Check for possibly empty file. With appends, currently Hadoop reports a
// zero length even if the file has been sync'd. Revisit if HDFS-376 or // zero length even if the file has been sync'd. Revisit if HDFS-376 or
// HDFS-878 is committed. // HDFS-878 is committed.
@ -896,7 +596,6 @@ public class HLogSplitter {
} }
} }
private void writerThreadError(Throwable t) { private void writerThreadError(Throwable t) {
thrown.compareAndSet(null, t); thrown.compareAndSet(null, t);
} }
@ -1078,7 +777,6 @@ public class HLogSplitter {
} }
} }
class WriterThread extends Thread { class WriterThread extends Thread {
private volatile boolean shouldStop = false; private volatile boolean shouldStop = false;
private OutputSink outputSink = null; private OutputSink outputSink = null;
@ -1127,7 +825,6 @@ public class HLogSplitter {
} }
} }
private void writeBuffer(RegionEntryBuffer buffer) throws IOException { private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
outputSink.append(buffer); outputSink.append(buffer);
} }
@ -1140,37 +837,6 @@ public class HLogSplitter {
} }
} }
Path convertRegionEditsToTemp(Path rootdir, Path edits, String tmpname) {
List<String> components = new ArrayList<String>(10);
do {
components.add(edits.getName());
edits = edits.getParent();
} while (edits.depth() > rootdir.depth());
Path ret = ZKSplitLog.getSplitLogDir(rootdir, tmpname);
for (int i = components.size() - 1; i >= 0; i--) {
ret = new Path(ret, components.get(i));
}
try {
if (fs.exists(ret)) {
LOG.warn("Found existing old temporary edits file. It could be the "
+ "result of a previous failed split attempt. Deleting "
+ ret + ", length="
+ fs.getFileStatus(ret).getLen());
if (!fs.delete(ret, false)) {
LOG.warn("Failed delete of old " + ret);
}
}
Path dir = ret.getParent();
if (!fs.exists(dir)) {
if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
}
} catch (IOException e) {
LOG.warn("Could not prepare temp staging area ", e);
// ignore, exceptions will be thrown elsewhere
}
return ret;
}
/** /**
* The following class is an abstraction class to provide a common interface to support both * The following class is an abstraction class to provide a common interface to support both
* existing recovered edits file sink and region server WAL edits replay sink * existing recovered edits file sink and region server WAL edits replay sink
@ -1199,6 +865,8 @@ public class HLogSplitter {
protected AtomicLong skippedEdits = new AtomicLong(); protected AtomicLong skippedEdits = new AtomicLong();
protected List<Path> splits = null;
public OutputSink(int numWriters) { public OutputSink(int numWriters) {
numThreads = numWriters; numThreads = numWriters;
} }
@ -1334,7 +1002,10 @@ public class HLogSplitter {
throw MultipleIOException.createIOException(thrown); throw MultipleIOException.createIOException(thrown);
} }
} }
return (isSuccessful) ? result : null; if (isSuccessful) {
splits = result;
}
return splits;
} }
/** /**
@ -2003,16 +1674,17 @@ public class HLogSplitter {
@Override @Override
List<Path> finishWritingAndClose() throws IOException { List<Path> finishWritingAndClose() throws IOException {
List<Path> result = new ArrayList<Path>();
try { try {
if (!finishWriting()) { if (!finishWriting()) {
return null; return null;
} }
if (hasEditsInDisablingOrDisabledTables) { if (hasEditsInDisablingOrDisabledTables) {
result = logRecoveredEditsOutputSink.finishWritingAndClose(); splits = logRecoveredEditsOutputSink.finishWritingAndClose();
} else {
splits = new ArrayList<Path>();
} }
// returns an empty array in order to keep interface same as old way // returns an empty array in order to keep interface same as old way
return result; return splits;
} finally { } finally {
List<IOException> thrown = closeRegionServerWriters(); List<IOException> thrown = closeRegionServerWriters();
if (thrown != null && !thrown.isEmpty()) { if (thrown != null && !thrown.isEmpty()) {

View File

@ -21,8 +21,6 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;

View File

@ -353,10 +353,8 @@ public class TestWALObserver {
} }
private Path runWALSplit(final Configuration c) throws IOException { private Path runWALSplit(final Configuration c) throws IOException {
FileSystem fs = FileSystem.get(c); List<Path> splits = HLogSplitter.split(
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c, hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c);
this.hbaseRootDir, this.logDir, this.oldLogDir, fs);
List<Path> splits = logSplitter.splitLog();
// Split should generate only 1 file since there's only 1 region // Split should generate only 1 file since there's only 1 region
assertEquals(1, splits.size()); assertEquals(1, splits.size());
// Make sure the file exists // Make sure the file exists

View File

@ -84,7 +84,6 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.log4j.Level; import org.apache.log4j.Level;
@ -130,7 +129,6 @@ public class TestDistributedLogSplitting {
conf.setInt("zookeeper.recovery.retry", 0); conf.setInt("zookeeper.recovery.retry", 0);
conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
conf.setBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true);
TEST_UTIL = new HBaseTestingUtility(conf); TEST_UTIL = new HBaseTestingUtility(conf);
TEST_UTIL.startMiniCluster(NUM_MASTERS, num_rs); TEST_UTIL.startMiniCluster(NUM_MASTERS, num_rs);
cluster = TEST_UTIL.getHBaseCluster(); cluster = TEST_UTIL.getHBaseCluster();
@ -401,7 +399,6 @@ public class TestDistributedLogSplitting {
abortMaster(cluster); abortMaster(cluster);
// abort RS // abort RS
int numRS = cluster.getLiveRegionServerThreads().size();
LOG.info("Aborting region server: " + hrs.getServerName()); LOG.info("Aborting region server: " + hrs.getServerName());
hrs.abort("testing"); hrs.abort("testing");
@ -484,7 +481,6 @@ public class TestDistributedLogSplitting {
abortMaster(cluster); abortMaster(cluster);
// abort RS // abort RS
int numRS = cluster.getLiveRegionServerThreads().size();
LOG.info("Aborting region server: " + hrs.getServerName()); LOG.info("Aborting region server: " + hrs.getServerName());
hrs.abort("testing"); hrs.abort("testing");
@ -1103,7 +1099,6 @@ public class TestDistributedLogSplitting {
// turn off load balancing to prevent regions from moving around otherwise // turn off load balancing to prevent regions from moving around otherwise
// they will consume recovered.edits // they will consume recovered.edits
master.balanceSwitch(false); master.balanceSwitch(false);
FileSystem fs = master.getMasterFileSystem().getFileSystem();
final ZooKeeperWatcher zkw = new ZooKeeperWatcher(curConf, "table-creation", null); final ZooKeeperWatcher zkw = new ZooKeeperWatcher(curConf, "table-creation", null);
List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();

View File

@ -193,8 +193,7 @@ public class TestSplitLogManager {
public void testTaskCreation() throws Exception { public void testTaskCreation() throws Exception {
LOG.info("TestTaskCreation - test the creation of a task in zk"); LOG.info("TestTaskCreation - test the creation of a task in zk");
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm.finishInitialization();
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -214,8 +213,7 @@ public class TestSplitLogManager {
zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT); CreateMode.PERSISTENT);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm.finishInitialization();
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
Task task = slm.findOrCreateOrphanTask(tasknode); Task task = slm.findOrCreateOrphanTask(tasknode);
assertTrue(task.isOrphan()); assertTrue(task.isOrphan());
@ -241,8 +239,7 @@ public class TestSplitLogManager {
CreateMode.PERSISTENT); CreateMode.PERSISTENT);
int version = ZKUtil.checkExists(zkw, tasknode); int version = ZKUtil.checkExists(zkw, tasknode);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm.finishInitialization();
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
Task task = slm.findOrCreateOrphanTask(tasknode); Task task = slm.findOrCreateOrphanTask(tasknode);
assertTrue(task.isOrphan()); assertTrue(task.isOrphan());
@ -265,8 +262,7 @@ public class TestSplitLogManager {
LOG.info("TestMultipleResbmits - no indefinite resubmissions"); LOG.info("TestMultipleResbmits - no indefinite resubmissions");
conf.setInt("hbase.splitlog.max.resubmit", 2); conf.setInt("hbase.splitlog.max.resubmit", 2);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm.finishInitialization();
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -298,8 +294,7 @@ public class TestSplitLogManager {
public void testRescanCleanup() throws Exception { public void testRescanCleanup() throws Exception {
LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up"); LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm.finishInitialization();
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -328,8 +323,7 @@ public class TestSplitLogManager {
public void testTaskDone() throws Exception { public void testTaskDone() throws Exception {
LOG.info("TestTaskDone - cleanup task node once in DONE state"); LOG.info("TestTaskDone - cleanup task node once in DONE state");
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm.finishInitialization();
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
final ServerName worker1 = new ServerName("worker1,1,1"); final ServerName worker1 = new ServerName("worker1,1,1");
@ -349,8 +343,7 @@ public class TestSplitLogManager {
LOG.info("TestTaskErr - cleanup task node once in ERR state"); LOG.info("TestTaskErr - cleanup task node once in ERR state");
conf.setInt("hbase.splitlog.max.resubmit", 0); conf.setInt("hbase.splitlog.max.resubmit", 0);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm.finishInitialization();
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -372,8 +365,7 @@ public class TestSplitLogManager {
public void testTaskResigned() throws Exception { public void testTaskResigned() throws Exception {
LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
assertEquals(tot_mgr_resubmit.get(), 0); assertEquals(tot_mgr_resubmit.get(), 0);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm.finishInitialization();
assertEquals(tot_mgr_resubmit.get(), 0); assertEquals(tot_mgr_resubmit.get(), 0);
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -407,8 +399,7 @@ public class TestSplitLogManager {
zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT); CreateMode.PERSISTENT);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm.finishInitialization();
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
// submit another task which will stay in unassigned mode // submit another task which will stay in unassigned mode
@ -437,8 +428,7 @@ public class TestSplitLogManager {
LOG.info("testDeadWorker"); LOG.info("testDeadWorker");
conf.setLong("hbase.splitlog.max.resubmit", 0); conf.setLong("hbase.splitlog.max.resubmit", 0);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm.finishInitialization();
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -463,8 +453,7 @@ public class TestSplitLogManager {
@Test @Test
public void testWorkerCrash() throws Exception { public void testWorkerCrash() throws Exception {
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm.finishInitialization();
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -489,8 +478,7 @@ public class TestSplitLogManager {
@Test @Test
public void testEmptyLogDir() throws Exception { public void testEmptyLogDir() throws Exception {
LOG.info("testEmptyLogDir"); LOG.info("testEmptyLogDir");
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm.finishInitialization();
FileSystem fs = TEST_UTIL.getTestFileSystem(); FileSystem fs = TEST_UTIL.getTestFileSystem();
Path emptyLogDirPath = new Path(fs.getWorkingDirectory(), Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
UUID.randomUUID().toString()); UUID.randomUUID().toString());
@ -505,8 +493,7 @@ public class TestSplitLogManager {
conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0); conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0);
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 1000); conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 1000);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm.finishInitialization();
FileSystem fs = TEST_UTIL.getTestFileSystem(); FileSystem fs = TEST_UTIL.getTestFileSystem();
final Path logDir = new Path(fs.getWorkingDirectory(), final Path logDir = new Path(fs.getWorkingDirectory(),
UUID.randomUUID().toString()); UUID.randomUUID().toString());
@ -544,5 +531,4 @@ public class TestSplitLogManager {
fs.delete(logDir, true); fs.delete(logDir, true);
} }
} }
} }

View File

@ -59,6 +59,7 @@ import org.junit.experimental.categories.Category;
/** JUnit test case for HLog */ /** JUnit test case for HLog */
@Category(LargeTests.class) @Category(LargeTests.class)
@SuppressWarnings("deprecation")
public class TestHLog { public class TestHLog {
private static final Log LOG = LogFactory.getLog(TestHLog.class); private static final Log LOG = LogFactory.getLog(TestHLog.class);
{ {
@ -193,10 +194,8 @@ public class TestHLog {
log.rollWriter(); log.rollWriter();
} }
log.close(); log.close();
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, List<Path> splits = HLogSplitter.split(
hbaseDir, logdir, oldLogDir, fs); hbaseDir, logdir, oldLogDir, fs, conf);
List<Path> splits =
logSplitter.splitLog();
verifySplits(splits, howmany); verifySplits(splits, howmany);
log = null; log = null;
} finally { } finally {
@ -340,7 +339,7 @@ public class TestHLog {
private void verifySplits(List<Path> splits, final int howmany) private void verifySplits(List<Path> splits, final int howmany)
throws IOException { throws IOException {
assertEquals(howmany, splits.size()); assertEquals(howmany * howmany, splits.size());
for (int i = 0; i < splits.size(); i++) { for (int i = 0; i < splits.size(); i++) {
LOG.info("Verifying=" + splits.get(i)); LOG.info("Verifying=" + splits.get(i));
HLog.Reader reader = HLogFactory.createReader(fs, splits.get(i), conf); HLog.Reader reader = HLogFactory.createReader(fs, splits.get(i), conf);
@ -362,7 +361,7 @@ public class TestHLog {
previousRegion = region; previousRegion = region;
count++; count++;
} }
assertEquals(howmany * howmany, count); assertEquals(howmany, count);
} finally { } finally {
reader.close(); reader.close();
} }
@ -479,7 +478,7 @@ public class TestHLog {
throw t.exception; throw t.exception;
// Make sure you can read all the content // Make sure you can read all the content
HLog.Reader reader = HLogFactory.createReader(this.fs, walPath, this.conf); HLog.Reader reader = HLogFactory.createReader(fs, walPath, conf);
int count = 0; int count = 0;
HLog.Entry entry = new HLog.Entry(); HLog.Entry entry = new HLog.Entry();
while (reader.next(entry) != null) { while (reader.next(entry) != null) {

View File

@ -71,8 +71,7 @@ public class TestHLogMethods {
createFile(fs, recoverededits, createFile(fs, recoverededits,
Long.toString(Long.MAX_VALUE) + "." + System.currentTimeMillis()); Long.toString(Long.MAX_VALUE) + "." + System.currentTimeMillis());
HLog log = HLogFactory.createHLog(fs, regiondir, HLogFactory.createHLog(fs, regiondir, "dummyLogName", util.getConfiguration());
"dummyLogName", util.getConfiguration());
NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir); NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
assertEquals(7, files.size()); assertEquals(7, files.size());
assertEquals(files.pollFirst().getName(), first); assertEquals(files.pollFirst().getName(), first);
@ -111,9 +110,8 @@ public class TestHLogMethods {
@Test @Test
public void testEntrySink() throws Exception { public void testEntrySink() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
HLogSplitter splitter = HLogSplitter.createLogSplitter( HLogSplitter splitter = new HLogSplitter(
conf, mock(Path.class), mock(Path.class), mock(Path.class), conf, mock(Path.class), mock(FileSystem.class), null, null);
mock(FileSystem.class));
EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024); EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024);
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {

View File

@ -42,11 +42,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException;
import org.apache.log4j.Level;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
@ -68,13 +63,17 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.CorruptedLogFileException; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.CorruptedLogFileException;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.log4j.Level;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -107,7 +106,6 @@ public class TestHLogSplit {
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final Path HBASEDIR = new Path("/hbase"); private static final Path HBASEDIR = new Path("/hbase");
private static final Path HLOGDIR = new Path(HBASEDIR, "hlog"); private static final Path HLOGDIR = new Path(HBASEDIR, "hlog");
private static final Path OLDLOGDIR = new Path(HBASEDIR, "hlog.old"); private static final Path OLDLOGDIR = new Path(HBASEDIR, "hlog.old");
@ -209,10 +207,15 @@ public class TestHLogSplit {
@Override @Override
public Integer run() throws Exception { public Integer run() throws Exception {
FileSystem fs = FileSystem.get(conf2); FileSystem fs = FileSystem.get(conf2);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf2, HBASEDIR, HLOGDIR, OLDLOGDIR, fs); int expectedFiles = fs.listStatus(HLOGDIR).length;
logSplitter.splitLog(); HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf2);
Path logfile = getLogForRegion(HBASEDIR, TABLE_NAME, region); Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
return countHLog(logfile, fs, conf2); assertEquals(expectedFiles, logfiles.length);
int count = 0;
for (Path logfile: logfiles) {
count += countHLog(logfile, fs, conf2);
}
return count;
} }
}); });
LOG.info("zombie=" + counter.get() + ", robber=" + count); LOG.info("zombie=" + counter.get() + ", robber=" + count);
@ -374,27 +377,6 @@ public class TestHLogSplit {
HLogFactory.createWriter(fs, p, conf).close(); HLogFactory.createWriter(fs, p, conf).close();
} }
@Test(expected = OrphanHLogAfterSplitException.class)
public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted()
throws IOException {
AtomicBoolean stop = new AtomicBoolean(false);
assertFalse("Previous test should clean up table dir",
fs.exists(new Path("/hbase/t1")));
generateHLogs(-1);
CountDownLatch latch = new CountDownLatch(1);
try {
(new ZombieNewLogWriterRegionServer(latch, stop)).start();
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
logSplitter.splitLog(latch);
} finally {
stop.set(true);
}
}
@Test @Test
public void testSplitPreservesEdits() throws IOException{ public void testSplitPreservesEdits() throws IOException{
final String REGION = "region__1"; final String REGION = "region__1";
@ -403,14 +385,12 @@ public class TestHLogSplit {
generateHLogs(1, 10, -1); generateHLogs(1, 10, -1);
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
logSplitter.splitLog();
Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
Path splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
assertEquals(1, splitLog.length);
assertEquals("edits differ after split", true, logsAreEqual(originalLog, splitLog)); assertEquals("edits differ after split", true, logsAreEqual(originalLog, splitLog[0]));
} }
@ -425,16 +405,17 @@ public class TestHLogSplit {
// initialize will create a new DFSClient with a new client ID // initialize will create a new DFSClient with a new client ID
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, int expectedFiles = fs.listStatus(HLOGDIR).length - 2; // less 2 empty files
HBASEDIR, HLOGDIR, OLDLOGDIR, fs); HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
logSplitter.splitLog();
for (String region : REGIONS) { for (String region : REGIONS) {
Path logfile = getLogForRegion(HBASEDIR, TABLE_NAME, region); Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); assertEquals(expectedFiles, logfiles.length);
int count = 0;
for (Path logfile: logfiles) {
count += countHLog(logfile, fs, conf);
}
assertEquals(NUM_WRITERS * ENTRIES, count);
} }
} }
@ -448,13 +429,16 @@ public class TestHLogSplit {
// initialize will create a new DFSClient with a new client ID // initialize will create a new DFSClient with a new client ID
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, int expectedFiles = fs.listStatus(HLOGDIR).length - 2 ; // less 2 empty files
HBASEDIR, HLOGDIR, OLDLOGDIR, fs); HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
logSplitter.splitLog();
for (String region : REGIONS) { for (String region : REGIONS) {
Path logfile = getLogForRegion(HBASEDIR, TABLE_NAME, region); Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); assertEquals(expectedFiles, logfiles.length);
int count = 0;
for (Path logfile: logfiles) {
count += countHLog(logfile, fs, conf);
}
assertEquals(NUM_WRITERS * ENTRIES, count);
} }
} }
@ -465,16 +449,17 @@ public class TestHLogSplit {
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, int expectedFiles = fs.listStatus(HLOGDIR).length;
HBASEDIR, HLOGDIR, OLDLOGDIR, fs); HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
logSplitter.splitLog();
for (String region : REGIONS) { for (String region : REGIONS) {
Path logfile = getLogForRegion(HBASEDIR, TABLE_NAME, region); Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); assertEquals(expectedFiles, logfiles.length);
int count = 0;
for (Path logfile: logfiles) {
count += countHLog(logfile, fs, conf);
}
assertEquals(NUM_WRITERS * ENTRIES, count);
} }
} }
@ -486,15 +471,17 @@ public class TestHLogSplit {
Corruptions.APPEND_GARBAGE, true, fs); Corruptions.APPEND_GARBAGE, true, fs);
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, int expectedFiles = fs.listStatus(HLOGDIR).length;
HBASEDIR, HLOGDIR, OLDLOGDIR, fs); HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
logSplitter.splitLog();
for (String region : REGIONS) { for (String region : REGIONS) {
Path logfile = getLogForRegion(HBASEDIR, TABLE_NAME, region); Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); assertEquals(expectedFiles, logfiles.length);
int count = 0;
for (Path logfile: logfiles) {
count += countHLog(logfile, fs, conf);
}
assertEquals(NUM_WRITERS * ENTRIES, count);
} }
} }
@Test @Test
@ -505,18 +492,19 @@ public class TestHLogSplit {
Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs); Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, int expectedFiles = fs.listStatus(HLOGDIR).length - 1; // less 1 corrupted file
HBASEDIR, HLOGDIR, OLDLOGDIR, fs); HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
logSplitter.splitLog();
for (String region : REGIONS) { for (String region : REGIONS) {
Path logfile = getLogForRegion(HBASEDIR, TABLE_NAME, region); Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
assertEquals((NUM_WRITERS - 1) * ENTRIES, countHLog(logfile, fs, conf)); assertEquals(expectedFiles, logfiles.length);
int count = 0;
for (Path logfile: logfiles) {
count += countHLog(logfile, fs, conf);
}
assertEquals((NUM_WRITERS - 1) * ENTRIES, count);
} }
} }
@Test @Test
public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException { public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, true); conf.setBoolean(HBASE_SKIP_ERRORS, true);
@ -524,19 +512,23 @@ public class TestHLogSplit {
corruptHLog(new Path(HLOGDIR, HLOG_FILE_PREFIX + "5"), corruptHLog(new Path(HLOGDIR, HLOG_FILE_PREFIX + "5"),
Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs); Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs);
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
logSplitter.splitLog();
int expectedFiles = fs.listStatus(HLOGDIR).length;
HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
for (String region : REGIONS) { for (String region : REGIONS) {
Path logfile = getLogForRegion(HBASEDIR, TABLE_NAME, region); Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
assertEquals(expectedFiles, logfiles.length);
int count = 0;
for (Path logfile: logfiles) {
count += countHLog(logfile, fs, conf);
}
// the entries in the original logs are alternating regions // the entries in the original logs are alternating regions
// considering the sequence file header, the middle corruption should // considering the sequence file header, the middle corruption should
// affect at least half of the entries // affect at least half of the entries
int goodEntries = (NUM_WRITERS - 1) * ENTRIES; int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1; int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
assertTrue("The file up to the corrupted area hasn't been parsed", assertTrue("The file up to the corrupted area hasn't been parsed",
goodEntries + firstHalfEntries <= countHLog(logfile, fs, conf)); goodEntries + firstHalfEntries <= count);
} }
} }
@ -556,9 +548,7 @@ public class TestHLogSplit {
conf.set("faultysequencefilelogreader.failuretype", failureType.name()); conf.set("faultysequencefilelogreader.failuretype", failureType.name());
generateHLogs(1, ENTRIES, -1); generateHLogs(1, ENTRIES, -1);
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
logSplitter.splitLog();
FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR); FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
assertEquals("expected a different file", c1.getName(), archivedLogs[0] assertEquals("expected a different file", c1.getName(), archivedLogs[0]
.getPath().getName()); .getPath().getName());
@ -586,16 +576,13 @@ public class TestHLogSplit {
FaultySequenceFileLogReader.class, HLog.Reader.class); FaultySequenceFileLogReader.class, HLog.Reader.class);
conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name()); conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
generateHLogs(Integer.MAX_VALUE); generateHLogs(Integer.MAX_VALUE);
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
logSplitter.splitLog();
} finally { } finally {
conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
Reader.class); Reader.class);
HLogFactory.resetLogReaderClass(); HLogFactory.resetLogReaderClass();
} }
} }
@Test @Test
@ -613,10 +600,8 @@ public class TestHLogSplit {
conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name()); conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
generateHLogs(-1); generateHLogs(-1);
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
try { try {
logSplitter.splitLog(); HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
} catch (IOException e) { } catch (IOException e) {
assertEquals( assertEquals(
"if skip.errors is false all files should remain in place", "if skip.errors is false all files should remain in place",
@ -627,7 +612,6 @@ public class TestHLogSplit {
Reader.class); Reader.class);
HLogFactory.resetLogReaderClass(); HLogFactory.resetLogReaderClass();
} }
} }
@Test @Test
@ -644,14 +628,13 @@ public class TestHLogSplit {
corruptHLog(c1, Corruptions.TRUNCATE, true, fs); corruptHLog(c1, Corruptions.TRUNCATE, true, fs);
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
logSplitter.splitLog();
Path splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
assertEquals(1, splitLog.length);
int actualCount = 0; int actualCount = 0;
HLog.Reader in = HLogFactory.createReader(fs, splitLog, conf); HLog.Reader in = HLogFactory.createReader(fs, splitLog[0], conf);
@SuppressWarnings("unused") @SuppressWarnings("unused")
HLog.Entry entry; HLog.Entry entry;
while ((entry = in.next()) != null) ++actualCount; while ((entry = in.next()) != null) ++actualCount;
@ -676,14 +659,13 @@ public class TestHLogSplit {
corruptHLog(c1, Corruptions.TRUNCATE_TRAILER, true, fs); corruptHLog(c1, Corruptions.TRUNCATE_TRAILER, true, fs);
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
logSplitter.splitLog();
Path splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
assertEquals(1, splitLog.length);
int actualCount = 0; int actualCount = 0;
HLog.Reader in = HLogFactory.createReader(fs, splitLog, conf); HLog.Reader in = HLogFactory.createReader(fs, splitLog[0], conf);
@SuppressWarnings("unused") @SuppressWarnings("unused")
HLog.Entry entry; HLog.Entry entry;
while ((entry = in.next()) != null) ++actualCount; while ((entry = in.next()) != null) ++actualCount;
@ -697,16 +679,10 @@ public class TestHLogSplit {
@Test @Test
public void testLogsGetArchivedAfterSplit() throws IOException { public void testLogsGetArchivedAfterSplit() throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, false); conf.setBoolean(HBASE_SKIP_ERRORS, false);
generateHLogs(-1); generateHLogs(-1);
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
logSplitter.splitLog();
FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR); FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length); assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
} }
@ -714,14 +690,17 @@ public class TestHLogSplit {
public void testSplit() throws IOException { public void testSplit() throws IOException {
generateHLogs(-1); generateHLogs(-1);
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
logSplitter.splitLog();
int expectedFiles = fs.listStatus(HLOGDIR).length;
HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
for (String region : REGIONS) { for (String region : REGIONS) {
Path logfile = getLogForRegion(HBASEDIR, TABLE_NAME, region); Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); assertEquals(expectedFiles, logfiles.length);
int count = 0;
for (Path logfile: logfiles) {
count += countHLog(logfile, fs, conf);
}
assertEquals(NUM_WRITERS * ENTRIES, count);
} }
} }
@ -730,9 +709,7 @@ public class TestHLogSplit {
throws IOException { throws IOException {
generateHLogs(-1); generateHLogs(-1);
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
logSplitter.splitLog();
FileStatus [] statuses = null; FileStatus [] statuses = null;
try { try {
statuses = fs.listStatus(HLOGDIR); statuses = fs.listStatus(HLOGDIR);
@ -745,41 +722,6 @@ public class TestHLogSplit {
} }
} }
@Test
public void testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted()
throws IOException {
AtomicBoolean stop = new AtomicBoolean(false);
generateHLogs(-1);
fs.initialize(fs.getUri(), conf);
CountDownLatch latch = new CountDownLatch(1);
Thread zombie = new ZombieNewLogWriterRegionServer(latch, stop);
List<Path> splits = null;
try {
zombie.start();
try {
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
splits = logSplitter.splitLog(latch);
} catch (IOException ex) {
/* expected */
LOG.warn("testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted", ex);
}
FileStatus[] files = fs.listStatus(HLOGDIR);
if (files == null) fail("no files in " + HLOGDIR + " with splits " + splits);
int logFilesNumber = files.length;
assertEquals("Log files should not be archived if there's an extra file after split",
NUM_WRITERS + 1, logFilesNumber);
} finally {
stop.set(true);
}
}
@Test(expected = IOException.class) @Test(expected = IOException.class)
public void testSplitWillFailIfWritingToRegionFails() throws Exception { public void testSplitWillFailIfWritingToRegionFails() throws Exception {
//leave 5th log open so we could append the "trap" //leave 5th log open so we could append the "trap"
@ -798,10 +740,7 @@ public class TestHLogSplit {
try { try {
InstrumentedSequenceFileLogWriter.activateFailure = true; InstrumentedSequenceFileLogWriter.activateFailure = true;
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
logSplitter.splitLog();
} catch (IOException e) { } catch (IOException e) {
assertEquals("This exception is instrumented and should only be thrown for testing", e.getMessage()); assertEquals("This exception is instrumented and should only be thrown for testing", e.getMessage());
throw e; throw e;
@ -825,21 +764,14 @@ public class TestHLogSplit {
generateHLogs(1, 100, -1); generateHLogs(1, 100, -1);
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
logSplitter.splitLog();
fs.rename(OLDLOGDIR, HLOGDIR); fs.rename(OLDLOGDIR, HLOGDIR);
Path firstSplitPath = new Path(HBASEDIR, Bytes.toString(TABLE_NAME) + ".first"); Path firstSplitPath = new Path(HBASEDIR, Bytes.toString(TABLE_NAME) + ".first");
Path splitPath = new Path(HBASEDIR, Bytes.toString(TABLE_NAME)); Path splitPath = new Path(HBASEDIR, Bytes.toString(TABLE_NAME));
fs.rename(splitPath, fs.rename(splitPath, firstSplitPath);
firstSplitPath);
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
logSplitter = HLogSplitter.createLogSplitter(conf, HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
logSplitter.splitLog();
assertEquals(0, compareHLogSplitDirs(firstSplitPath, splitPath)); assertEquals(0, compareHLogSplitDirs(firstSplitPath, splitPath));
} }
@ -850,16 +782,11 @@ public class TestHLogSplit {
REGIONS.add(region); REGIONS.add(region);
generateHLogs(1); generateHLogs(1);
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
Path regiondir = new Path(TABLEDIR, region); Path regiondir = new Path(TABLEDIR, region);
fs.delete(regiondir, true); fs.delete(regiondir, true);
HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
logSplitter.splitLog();
assertFalse(fs.exists(regiondir)); assertFalse(fs.exists(regiondir));
} }
@ -868,20 +795,23 @@ public class TestHLogSplit {
conf.setBoolean(HBASE_SKIP_ERRORS, false); conf.setBoolean(HBASE_SKIP_ERRORS, false);
generateHLogs(-1); generateHLogs(-1);
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
FileStatus[] logfiles = fs.listStatus(HLOGDIR);
assertTrue("There should be some log file",
logfiles != null && logfiles.length > 0);
// Set up a splitter that will throw an IOE on the output side // Set up a splitter that will throw an IOE on the output side
HLogSplitter logSplitter = new HLogSplitter( HLogSplitter logSplitter = new HLogSplitter(
conf, HBASEDIR, HLOGDIR, OLDLOGDIR, fs, null) { conf, HBASEDIR, fs, null, null) {
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) protected HLog.Writer createWriter(FileSystem fs,
throws IOException { Path logfile, Configuration conf) throws IOException {
HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class); HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
Mockito.doThrow(new IOException("Injected")).when(mockWriter).append(Mockito.<HLog.Entry>any()); Mockito.doThrow(new IOException("Injected")).when(
mockWriter).append(Mockito.<HLog.Entry>any());
return mockWriter; return mockWriter;
} }
}; };
try { try {
logSplitter.splitLog(); logSplitter.splitLogFile(logfiles[0], null);
fail("Didn't throw!"); fail("Didn't throw!");
} catch (IOException ioe) { } catch (IOException ioe) {
assertTrue(ioe.toString().contains("Injected")); assertTrue(ioe.toString().contains("Injected"));
@ -903,11 +833,8 @@ public class TestHLogSplit {
Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")). Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).
when(spiedFs).append(Mockito.<Path>any()); when(spiedFs).append(Mockito.<Path>any());
HLogSplitter logSplitter = new HLogSplitter(
conf, HBASEDIR, HLOGDIR, OLDLOGDIR, spiedFs, null);
try { try {
logSplitter.splitLog(); HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, spiedFs, conf);
assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
assertFalse(fs.exists(HLOGDIR)); assertFalse(fs.exists(HLOGDIR));
} catch (IOException e) { } catch (IOException e) {
@ -945,11 +872,8 @@ public class TestHLogSplit {
} }
}).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt()); }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
HLogSplitter logSplitter = new HLogSplitter(
conf, HBASEDIR, HLOGDIR, OLDLOGDIR, spiedFs, null);
try { try {
logSplitter.splitLog(); HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, spiedFs, conf);
assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
assertFalse(fs.exists(HLOGDIR)); assertFalse(fs.exists(HLOGDIR));
} catch (IOException e) { } catch (IOException e) {
@ -984,8 +908,8 @@ public class TestHLogSplit {
try { try {
conf.setInt("hbase.splitlog.report.period", 1000); conf.setInt("hbase.splitlog.report.period", 1000);
HLogSplitter s = new HLogSplitter(conf, HBASEDIR, null, null, spiedFs, null); boolean ret = HLogSplitter.splitLogFile(
boolean ret = s.splitLogFile(logfile, localReporter); HBASEDIR, logfile, spiedFs, conf, localReporter, null, null);
assertFalse("Log splitting should failed", ret); assertFalse("Log splitting should failed", ret);
assertTrue(count.get() > 0); assertTrue(count.get() > 0);
} catch (IOException e) { } catch (IOException e) {
@ -1034,7 +958,8 @@ public class TestHLogSplit {
localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize); localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
// Create a fake log file (we'll override the reader to produce a stream of edits) // Create a fake log file (we'll override the reader to produce a stream of edits)
FSDataOutputStream out = fs.create(new Path(HLOGDIR, HLOG_FILE_PREFIX + ".fake")); Path logPath = new Path(HLOGDIR, HLOG_FILE_PREFIX + ".fake");
FSDataOutputStream out = fs.create(logPath);
out.close(); out.close();
// Make region dirs for our destination regions so the output doesn't get skipped // Make region dirs for our destination regions so the output doesn't get skipped
@ -1043,7 +968,7 @@ public class TestHLogSplit {
// Create a splitter that reads and writes the data without touching disk // Create a splitter that reads and writes the data without touching disk
HLogSplitter logSplitter = new HLogSplitter( HLogSplitter logSplitter = new HLogSplitter(
localConf, HBASEDIR, HLOGDIR, OLDLOGDIR, fs, null) { localConf, HBASEDIR, fs, null, null) {
/* Produce a mock writer that doesn't write anywhere */ /* Produce a mock writer that doesn't write anywhere */
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
@ -1076,7 +1001,6 @@ public class TestHLogSplit {
return mockWriter; return mockWriter;
} }
/* Produce a mock reader that generates fake entries */ /* Produce a mock reader that generates fake entries */
protected Reader getReader(FileSystem fs, Path curLogFile, protected Reader getReader(FileSystem fs, Path curLogFile,
Configuration conf, CancelableProgressable reporter) throws IOException { Configuration conf, CancelableProgressable reporter) throws IOException {
@ -1103,15 +1027,13 @@ public class TestHLogSplit {
} }
}; };
logSplitter.splitLog(); logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
// Verify number of written edits per region // Verify number of written edits per region
Map<byte[], Long> outputCounts = logSplitter.outputSink.getOutputCounts();
Map<byte[], Long> outputCounts = logSplitter.getOutputCounts();
for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) { for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
LOG.info("Got " + entry.getValue() + " output edits for region " + LOG.info("Got " + entry.getValue() + " output edits for region " +
Bytes.toString(entry.getKey())); Bytes.toString(entry.getKey()));
assertEquals((long)entry.getValue(), numFakeEdits / regions.size()); assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
} }
assertEquals(regions.size(), outputCounts.size()); assertEquals(regions.size(), outputCounts.size());
@ -1160,9 +1082,7 @@ public class TestHLogSplit {
LOG.debug("Renamed region directory: " + rsSplitDir); LOG.debug("Renamed region directory: " + rsSplitDir);
// Process the old log files // Process the old log files
HLogSplitter splitter = HLogSplitter.createLogSplitter(conf, HLogSplitter.split(HBASEDIR, rsSplitDir, OLDLOGDIR, fs, conf);
HBASEDIR, rsSplitDir, OLDLOGDIR, fs);
splitter.splitLog();
// Now, try to roll the HLog and verify failure // Now, try to roll the HLog and verify failure
try { try {
@ -1232,17 +1152,6 @@ public class TestHLogSplit {
} }
} }
private CancelableProgressable reporter = new CancelableProgressable() {
int count = 0;
@Override
public boolean progress() {
count++;
LOG.debug("progress = " + count);
return true;
}
};
@Test @Test
public void testSplitLogFileWithOneRegion() throws IOException { public void testSplitLogFileWithOneRegion() throws IOException {
LOG.info("testSplitLogFileWithOneRegion"); LOG.info("testSplitLogFileWithOneRegion");
@ -1250,60 +1159,45 @@ public class TestHLogSplit {
REGIONS.removeAll(REGIONS); REGIONS.removeAll(REGIONS);
REGIONS.add(REGION); REGIONS.add(REGION);
generateHLogs(1, 10, -1); generateHLogs(1, 10, -1);
FileStatus logfile = fs.listStatus(HLOGDIR)[0];
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
HLogSplitter.splitLogFile(HBASEDIR, logfile, fs, conf, reporter); HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
HLogSplitter.finishSplitLogFile(HBASEDIR, OLDLOGDIR, logfile.getPath()
.toString(), conf);
Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
Path splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
assertEquals(1, splitLog.length);
assertEquals(true, logsAreEqual(originalLog, splitLog[0]));
assertEquals(true, logsAreEqual(originalLog, splitLog));
} }
@Test @Test
public void testSplitLogFileDeletedRegionDir() public void testSplitLogFileDeletedRegionDir() throws IOException {
throws IOException { LOG.info("testSplitLogFileDeletedRegionDir");
LOG.info("testSplitLogFileDeletedRegionDir"); final String REGION = "region__1";
final String REGION = "region__1";
REGIONS.removeAll(REGIONS); REGIONS.removeAll(REGIONS);
REGIONS.add(REGION); REGIONS.add(REGION);
generateHLogs(1, 10, -1); generateHLogs(1, 10, -1);
FileStatus logfile = fs.listStatus(HLOGDIR)[0];
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
Path regiondir = new Path(TABLEDIR, REGION); Path regiondir = new Path(TABLEDIR, REGION);
LOG.info("Region directory is" + regiondir); LOG.info("Region directory is" + regiondir);
fs.delete(regiondir, true); fs.delete(regiondir, true);
HLogSplitter.splitLogFile(HBASEDIR, logfile, fs, conf, reporter); HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
HLogSplitter.finishSplitLogFile(HBASEDIR, OLDLOGDIR, logfile.getPath()
.toString(), conf);
assertTrue(!fs.exists(regiondir)); assertTrue(!fs.exists(regiondir));
assertTrue(true); assertTrue(true);
} }
@Test @Test
public void testSplitLogFileEmpty() throws IOException { public void testSplitLogFileEmpty() throws IOException {
LOG.info("testSplitLogFileEmpty"); LOG.info("testSplitLogFileEmpty");
injectEmptyFile(".empty", true); injectEmptyFile(".empty", true);
FileStatus logfile = fs.listStatus(HLOGDIR)[0];
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
HLogSplitter.splitLogFile(HBASEDIR, logfile, fs, conf, reporter); HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
HLogSplitter.finishSplitLogFile(HBASEDIR, OLDLOGDIR, logfile.getPath()
.toString(), conf);
Path tdir = HTableDescriptor.getTableDir(HBASEDIR, TABLE_NAME); Path tdir = HTableDescriptor.getTableDir(HBASEDIR, TABLE_NAME);
assertFalse(fs.exists(tdir)); assertFalse(fs.exists(tdir));
@ -1314,15 +1208,13 @@ public class TestHLogSplit {
public void testSplitLogFileMultipleRegions() throws IOException { public void testSplitLogFileMultipleRegions() throws IOException {
LOG.info("testSplitLogFileMultipleRegions"); LOG.info("testSplitLogFileMultipleRegions");
generateHLogs(1, 10, -1); generateHLogs(1, 10, -1);
FileStatus logfile = fs.listStatus(HLOGDIR)[0];
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
HLogSplitter.splitLogFile(HBASEDIR, logfile, fs, conf, reporter); HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
HLogSplitter.finishSplitLogFile(HBASEDIR, OLDLOGDIR, logfile.getPath()
.toString(), conf);
for (String region : REGIONS) { for (String region : REGIONS) {
Path recovered = getLogForRegion(HBASEDIR, TABLE_NAME, region); Path[] recovered = getLogForRegion(HBASEDIR, TABLE_NAME, region);
assertEquals(10, countHLog(recovered, fs, conf)); assertEquals(1, recovered.length);
assertEquals(10, countHLog(recovered[0], fs, conf));
} }
} }
@ -1337,9 +1229,7 @@ public class TestHLogSplit {
Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs); Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
fs.initialize(fs.getUri(), conf); fs.initialize(fs.getUri(), conf);
HLogSplitter.splitLogFile(HBASEDIR, logfile, fs, conf, reporter); HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
HLogSplitter.finishSplitLogFile(HBASEDIR, OLDLOGDIR, logfile.getPath()
.toString(), conf);
final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get( final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
"hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt")); "hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
@ -1361,9 +1251,12 @@ public class TestHLogSplit {
generateHLogs(-1); generateHLogs(-1);
HLogFactory.createHLog(fs, regiondir, regionName, conf); HLogFactory.createHLog(fs, regiondir, regionName, conf);
FileStatus[] logfiles = fs.listStatus(HLOGDIR);
assertTrue("There should be some log file",
logfiles != null && logfiles.length > 0);
HLogSplitter logSplitter = new HLogSplitter( HLogSplitter logSplitter = new HLogSplitter(
conf, HBASEDIR, HLOGDIR, OLDLOGDIR, fs, null) { conf, HBASEDIR, fs, null, null) {
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
throws IOException { throws IOException {
HLog.Writer writer = HLogFactory.createWriter(fs, logfile, conf); HLog.Writer writer = HLogFactory.createWriter(fs, logfile, conf);
@ -1384,7 +1277,7 @@ public class TestHLogSplit {
} }
}; };
try{ try{
logSplitter.splitLog(); logSplitter.splitLogFile(logfiles[0], null);
} catch (IOException e) { } catch (IOException e) {
LOG.info(e); LOG.info(e);
Assert.fail("Throws IOException when spliting " Assert.fail("Throws IOException when spliting "
@ -1443,15 +1336,18 @@ public class TestHLogSplit {
return ws; return ws;
} }
private Path getLogForRegion(Path rootdir, byte[] table, String region) private Path[] getLogForRegion(Path rootdir, byte[] table, String region)
throws IOException { throws IOException {
Path tdir = HTableDescriptor.getTableDir(rootdir, table); Path tdir = HTableDescriptor.getTableDir(rootdir, table);
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
Path editsdir = HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, Path editsdir = HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
Bytes.toString(region.getBytes()))); Bytes.toString(region.getBytes())));
FileStatus [] files = this.fs.listStatus(editsdir); FileStatus [] files = this.fs.listStatus(editsdir);
assertEquals(1, files.length); Path[] paths = new Path[files.length];
return files[0].getPath(); for (int i = 0; i < files.length; i++) {
paths[i] = files[i].getPath();
}
return paths;
} }
private void corruptHLog(Path path, Corruptions corruption, boolean close, private void corruptHLog(Path path, Corruptions corruption, boolean close,
@ -1635,6 +1531,4 @@ public class TestHLogSplit {
} }
return true; return true;
} }
} }

View File

@ -21,13 +21,11 @@ package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -814,8 +812,8 @@ public class TestWALReplay {
wal.completeCacheFlush(hri.getEncodedNameAsBytes()); wal.completeCacheFlush(hri.getEncodedNameAsBytes());
wal.close(); wal.close();
FileStatus[] listStatus = this.fs.listStatus(wal.getDir()); FileStatus[] listStatus = this.fs.listStatus(wal.getDir());
HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0],
null); this.fs, this.conf, null, null, null);
FileStatus[] listStatus1 = this.fs.listStatus(new Path(hbaseRootDir + "/" FileStatus[] listStatus1 = this.fs.listStatus(new Path(hbaseRootDir + "/"
+ tableNameStr + "/" + hri.getEncodedName() + "/recovered.edits")); + tableNameStr + "/" + hri.getEncodedName() + "/recovered.edits"));
int editCount = 0; int editCount = 0;
@ -923,10 +921,8 @@ public class TestWALReplay {
* @throws IOException * @throws IOException
*/ */
private Path runWALSplit(final Configuration c) throws IOException { private Path runWALSplit(final Configuration c) throws IOException {
FileSystem fs = FileSystem.get(c); List<Path> splits = HLogSplitter.split(
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c, hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c);
this.hbaseRootDir, this.logDir, this.oldLogDir, fs);
List<Path> splits = logSplitter.splitLog();
// Split should generate only 1 file since there's only 1 region // Split should generate only 1 file since there's only 1 region
assertEquals("splits=" + splits, 1, splits.size()); assertEquals("splits=" + splits, 1, splits.size());
// Make sure the file exists // Make sure the file exists

View File

@ -123,7 +123,6 @@ public class TestHBaseFsck {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, false);
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.handler.count", 2); TEST_UTIL.getConfiguration().setInt("hbase.regionserver.handler.count", 2);
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.metahandler.count", 2); TEST_UTIL.getConfiguration().setInt("hbase.regionserver.metahandler.count", 2);
TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniCluster(3);