HBASE-15813 Rename DefaultWALProvider to a more specific name and clean up unnecessary reference to it
This commit is contained in:
parent
c867858c44
commit
1267f76e9a
|
@ -53,7 +53,7 @@ import org.apache.hadoop.hbase.master.SplitLogManager.Task;
|
|||
import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
|
@ -715,7 +715,8 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
|
|||
}
|
||||
// decode the file name
|
||||
t = ZKSplitLog.getFileName(t);
|
||||
ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(new Path(t));
|
||||
ServerName serverName = AbstractFSWALProvider
|
||||
.getServerNameFromWALDirectoryName(new Path(t));
|
||||
if (serverName != null) {
|
||||
knownFailedServers.add(serverName.getServerName());
|
||||
} else {
|
||||
|
|
|
@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandle
|
|||
import org.apache.hadoop.hbase.regionserver.handler.WALSplitterHandler;
|
||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
|
||||
|
@ -418,7 +418,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
|
|||
// pick meta wal firstly
|
||||
int offset = (int) (Math.random() * paths.size());
|
||||
for (int i = 0; i < paths.size(); i++) {
|
||||
if (DefaultWALProvider.isMetaFile(paths.get(i))) {
|
||||
if (AbstractFSWALProvider.isMetaFile(paths.get(i))) {
|
||||
offset = i;
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ import org.apache.hadoop.fs.FilterFileSystem;
|
|||
import org.apache.hadoop.fs.LocalFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
|
@ -353,7 +353,7 @@ public class HFileSystem extends FilterFileSystem {
|
|||
public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src)
|
||||
throws IOException {
|
||||
|
||||
ServerName sn = DefaultWALProvider.getServerNameFromWALDirectoryName(conf, src);
|
||||
ServerName sn = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, src);
|
||||
if (sn == null) {
|
||||
// It's not an WAL
|
||||
return;
|
||||
|
|
|
@ -732,8 +732,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
|||
return e.getClassName().endsWith("RetryStartFileException");
|
||||
}
|
||||
|
||||
static void completeFile(DFSClient client, ClientProtocol namenode, String src,
|
||||
String clientName, ExtendedBlock block, long fileId) {
|
||||
static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
|
||||
ExtendedBlock block, long fileId) {
|
||||
for (int retry = 0;; retry++) {
|
||||
try {
|
||||
if (namenode.complete(src, clientName, block, fileId)) {
|
||||
|
@ -742,9 +742,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
|||
} else {
|
||||
LOG.warn("complete file " + src + " not finished, retry = " + retry);
|
||||
}
|
||||
} catch (LeaseExpiredException e) {
|
||||
LOG.warn("lease for file " + src + " is expired, give up", e);
|
||||
return;
|
||||
} catch (RemoteException e) {
|
||||
IOException ioe = e.unwrapRemoteException();
|
||||
if (ioe instanceof LeaseExpiredException) {
|
||||
LOG.warn("lease for file " + src + " is expired, give up", e);
|
||||
return;
|
||||
} else {
|
||||
LOG.warn("complete file " + src + " failed, retry = " + retry, e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("complete file " + src + " failed, retry = " + retry, e);
|
||||
}
|
||||
|
|
|
@ -44,7 +44,6 @@ import java.util.concurrent.locks.ReentrantLock;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -62,6 +61,7 @@ import org.apache.hadoop.hbase.RegionStateListener;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.MasterSwitchType;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
|
@ -84,13 +84,13 @@ import org.apache.hadoop.hbase.quotas.QuotaExceededException;
|
|||
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.KeyLocker;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
@ -500,8 +500,8 @@ public class AssignmentManager {
|
|||
// In the case of a clean exit, the shutdown handler would have presplit any WALs and
|
||||
// removed empty directories.
|
||||
Path logDir = new Path(rootdir,
|
||||
DefaultWALProvider.getWALDirectoryName(serverName.toString()));
|
||||
Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
|
||||
AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
|
||||
Path splitDir = logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
|
||||
if (checkWals(fs, logDir) || checkWals(fs, splitDir)) {
|
||||
LOG.debug("Found queued dead server " + serverName);
|
||||
failover = true;
|
||||
|
|
|
@ -18,6 +18,14 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.Service;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.util.ArrayList;
|
||||
|
@ -97,14 +105,6 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.Service;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
/**
|
||||
* Implements the master RPC services.
|
||||
*/
|
||||
|
|
|
@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
|
||||
/**
|
||||
|
@ -57,14 +57,14 @@ public class MasterWalManager {
|
|||
final static PathFilter META_FILTER = new PathFilter() {
|
||||
@Override
|
||||
public boolean accept(Path p) {
|
||||
return DefaultWALProvider.isMetaFile(p);
|
||||
return AbstractFSWALProvider.isMetaFile(p);
|
||||
}
|
||||
};
|
||||
|
||||
final static PathFilter NON_META_FILTER = new PathFilter() {
|
||||
@Override
|
||||
public boolean accept(Path p) {
|
||||
return !DefaultWALProvider.isMetaFile(p);
|
||||
return !AbstractFSWALProvider.isMetaFile(p);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -186,7 +186,7 @@ public class MasterWalManager {
|
|||
// Empty log folder. No recovery needed
|
||||
continue;
|
||||
}
|
||||
final ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(
|
||||
final ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(
|
||||
status.getPath());
|
||||
if (null == serverName) {
|
||||
LOG.warn("Log folder " + status.getPath() + " doesn't look like its name includes a " +
|
||||
|
@ -261,8 +261,8 @@ public class MasterWalManager {
|
|||
try {
|
||||
for (ServerName serverName : serverNames) {
|
||||
Path logDir = new Path(this.rootDir,
|
||||
DefaultWALProvider.getWALDirectoryName(serverName.toString()));
|
||||
Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
|
||||
AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
|
||||
Path splitDir = logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
|
||||
// Rename the directory so a rogue RS doesn't create more WALs
|
||||
if (fs.exists(logDir)) {
|
||||
if (!this.fs.rename(logDir, splitDir)) {
|
||||
|
|
|
@ -24,6 +24,8 @@ import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.F
|
|||
import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
|
||||
import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -63,9 +65,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.R
|
|||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
|
||||
/**
|
||||
* Distributes the task of log splitting to the available region servers.
|
||||
|
@ -169,12 +169,12 @@ public class SplitLogManager {
|
|||
* Get a list of paths that need to be split given a set of server-specific directories and
|
||||
* optionally a filter.
|
||||
*
|
||||
* See {@link DefaultWALProvider#getServerNameFromWALDirectoryName} for more info on directory
|
||||
* See {@link AbstractFSWALProvider#getServerNameFromWALDirectoryName} for more info on directory
|
||||
* layout.
|
||||
*
|
||||
* Should be package-private, but is needed by
|
||||
* {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem,
|
||||
* Configuration, WALFactory)} for tests.
|
||||
* Configuration, org.apache.hadoop.hbase.wal.WALFactory)} for tests.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static FileStatus[] getFileList(final Configuration conf, final List<Path> logDirs,
|
||||
|
@ -225,7 +225,7 @@ public class SplitLogManager {
|
|||
Set<ServerName> serverNames = new HashSet<ServerName>();
|
||||
for (Path logDir : logDirs) {
|
||||
try {
|
||||
ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logDir);
|
||||
ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(logDir);
|
||||
if (serverName != null) {
|
||||
serverNames.add(serverName);
|
||||
}
|
||||
|
|
|
@ -21,12 +21,12 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
|
||||
/**
|
||||
* This Chore, every time it runs, will attempt to delete the WALs in the old logs folder. The WAL
|
||||
|
@ -51,6 +51,6 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
|
|||
|
||||
@Override
|
||||
protected boolean validate(Path file) {
|
||||
return DefaultWALProvider.validateWALFilename(file.getName());
|
||||
return AbstractFSWALProvider.validateWALFilename(file.getName());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,17 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.protobuf.BlockingRpcChannel;
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.Service;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.lang.Thread.UncaughtExceptionHandler;
|
||||
|
@ -163,7 +174,7 @@ import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
|
|||
import org.apache.hadoop.hbase.util.Sleeper;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.VersionInfo;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
|
||||
|
@ -183,17 +194,6 @@ import org.apache.zookeeper.KeeperException;
|
|||
import org.apache.zookeeper.KeeperException.NoNodeException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.protobuf.BlockingRpcChannel;
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.Service;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
import sun.misc.Signal;
|
||||
import sun.misc.SignalHandler;
|
||||
|
||||
|
@ -1655,7 +1655,7 @@ public class HRegionServer extends HasThread implements
|
|||
private WALFactory setupWALAndReplication() throws IOException {
|
||||
// TODO Replication make assumptions here based on the default filesystem impl
|
||||
final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
|
||||
final String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
|
||||
|
||||
Path logdir = new Path(rootDir, logName);
|
||||
if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
|
||||
|
|
|
@ -878,6 +878,12 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName() + " " + walFilePrefix + ":" + walFileSuffix + "(num "
|
||||
+ filenum + ")";
|
||||
}
|
||||
|
||||
/**
|
||||
* NOTE: This append, at a time that is usually after this call returns, starts an mvcc
|
||||
* transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
|
||||
|
|
|
@ -17,6 +17,15 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.lmax.disruptor.BlockingWaitStrategy;
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
import com.lmax.disruptor.ExceptionHandler;
|
||||
import com.lmax.disruptor.LifecycleAware;
|
||||
import com.lmax.disruptor.TimeoutException;
|
||||
import com.lmax.disruptor.dsl.Disruptor;
|
||||
import com.lmax.disruptor.dsl.ProducerType;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
@ -30,15 +39,6 @@ import java.util.concurrent.LinkedBlockingQueue;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.lmax.disruptor.BlockingWaitStrategy;
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
import com.lmax.disruptor.ExceptionHandler;
|
||||
import com.lmax.disruptor.LifecycleAware;
|
||||
import com.lmax.disruptor.TimeoutException;
|
||||
import com.lmax.disruptor.dsl.Disruptor;
|
||||
import com.lmax.disruptor.dsl.ProducerType;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -54,7 +54,7 @@ import org.apache.hadoop.hbase.util.ClassSize;
|
|||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.FSHLogProvider;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
|
||||
|
@ -212,7 +212,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
* @param prefix should always be hostname and port in distributed env and it will be URL encoded
|
||||
* before being used. If prefix is null, "wal" will be used
|
||||
* @param suffix will be url encoded. null is treated as empty. non-empty must start with
|
||||
* {@link DefaultWALProvider#WAL_FILE_NAME_DELIMITER}
|
||||
* {@link org.apache.hadoop.hbase.wal.AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER}
|
||||
*/
|
||||
public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
|
||||
final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
|
||||
|
@ -258,8 +258,9 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
/**
|
||||
* Currently, we need to expose the writer's OutputStream to tests so that they can manipulate the
|
||||
* default behavior (such as setting the maxRecoveryErrorCount value for example (see
|
||||
* {@link TestWALReplay#testReplayEditsWrittenIntoWAL()}). This is done using reflection on the
|
||||
* underlying HDFS OutputStream. NOTE: This could be removed once Hadoop1 support is removed.
|
||||
* {@link AbstractTestWALReplay#testReplayEditsWrittenIntoWAL()}). This is done using reflection
|
||||
* on the underlying HDFS OutputStream. NOTE: This could be removed once Hadoop1 support is
|
||||
* removed.
|
||||
* @return null if underlying stream is not ready.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
|
@ -288,7 +289,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
* @return Writer instance
|
||||
*/
|
||||
protected Writer createWriterInstance(final Path path) throws IOException {
|
||||
Writer writer = DefaultWALProvider.createWriter(conf, fs, path, false);
|
||||
Writer writer = FSHLogProvider.createWriter(conf, fs, path, false);
|
||||
if (writer instanceof ProtobufLogWriter) {
|
||||
preemptiveSync((ProtobufLogWriter) writer);
|
||||
}
|
||||
|
@ -433,11 +434,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "FSHLog " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")";
|
||||
}
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION",
|
||||
justification = "Will never be null")
|
||||
@Override
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -30,14 +32,11 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CollectionUtils;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
|
||||
/**
|
||||
* A WAL Entry for {@link FSHLog} implementation. Immutable.
|
||||
* A WAL Entry for {@link AbstractFSWAL} implementation. Immutable.
|
||||
* A subclass of {@link Entry} that carries extra info across the ring buffer such as
|
||||
* region sequence id (we want to use this later, just before we write the WAL to ensure region
|
||||
* edits maintain order). The extra info added here is not 'serialized' as part of the WALEdit
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.Cell;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.FSHLogProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
|
||||
/**
|
||||
|
@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ProtobufLogWriter extends AbstractProtobufLogWriter
|
||||
implements DefaultWALProvider.Writer {
|
||||
implements FSHLogProvider.Writer {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ProtobufLogWriter.class);
|
||||
|
||||
|
|
|
@ -33,11 +33,11 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.io.util.LRUDictionary;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
|
||||
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
||||
public abstract class ReaderBase implements DefaultWALProvider.Reader {
|
||||
public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
|
||||
private static final Log LOG = LogFactory.getLog(ReaderBase.class);
|
||||
protected Configuration conf;
|
||||
protected FileSystem fs;
|
||||
|
|
|
@ -18,6 +18,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.Service;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
@ -64,14 +68,10 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.Service;
|
||||
|
||||
/**
|
||||
* Class that handles the source of a replication stream.
|
||||
* Currently does not handle more than 1 slave
|
||||
|
@ -198,7 +198,7 @@ public class ReplicationSource extends Thread
|
|||
|
||||
@Override
|
||||
public void enqueueLog(Path log) {
|
||||
String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(log.getName());
|
||||
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName());
|
||||
PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
|
||||
if (queue == null) {
|
||||
queue = new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
|
||||
|
@ -839,12 +839,10 @@ public class ReplicationSource extends Thread
|
|||
final Path rootDir = FSUtils.getRootDir(conf);
|
||||
for (String curDeadServerName : deadRegionServers) {
|
||||
final Path deadRsDirectory = new Path(rootDir,
|
||||
DefaultWALProvider.getWALDirectoryName(curDeadServerName));
|
||||
Path[] locs = new Path[] {
|
||||
new Path(deadRsDirectory, currentPath.getName()),
|
||||
new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
|
||||
currentPath.getName()),
|
||||
};
|
||||
AbstractFSWALProvider.getWALDirectoryName(curDeadServerName));
|
||||
Path[] locs = new Path[] { new Path(deadRsDirectory, currentPath.getName()),
|
||||
new Path(deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT),
|
||||
currentPath.getName()) };
|
||||
for (Path possibleLogLocation : locs) {
|
||||
LOG.info("Possible location " + possibleLogLocation.toUri().toString());
|
||||
if (manager.getFs().exists(possibleLogLocation)) {
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -61,9 +63,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationTracker;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
|
||||
/**
|
||||
* This class is responsible to manage all the replication
|
||||
|
@ -197,7 +197,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
* @param queueRecovered Whether this is a recovered queue
|
||||
*/
|
||||
public void cleanOldLogs(String key, String id, boolean queueRecovered) {
|
||||
String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
|
||||
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(key);
|
||||
if (queueRecovered) {
|
||||
SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
|
||||
if (wals != null && !wals.first().equals(key)) {
|
||||
|
@ -277,7 +277,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
if (this.latestPaths.size() > 0) {
|
||||
for (Path logPath : latestPaths) {
|
||||
String name = logPath.getName();
|
||||
String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name);
|
||||
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
|
||||
SortedSet<String> logs = new TreeSet<String>();
|
||||
logs.add(name);
|
||||
walsByGroup.put(walPrefix, logs);
|
||||
|
@ -358,7 +358,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
void preLogRoll(Path newLog) throws IOException {
|
||||
recordLog(newLog);
|
||||
String logName = newLog.getName();
|
||||
String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
|
||||
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
|
||||
synchronized (latestPaths) {
|
||||
Iterator<Path> iterator = latestPaths.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
|
@ -380,7 +380,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
*/
|
||||
private void recordLog(Path logPath) throws IOException {
|
||||
String logName = logPath.getName();
|
||||
String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
|
||||
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
|
||||
// update replication queues on ZK
|
||||
synchronized (replicationPeers) {// synchronize on replicationPeers to avoid adding source for
|
||||
// the to-be-removed peer
|
||||
|
@ -674,7 +674,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
|
||||
walsByIdRecoveredQueues.put(peerId, walsByGroup);
|
||||
for (String wal : walsSet) {
|
||||
String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(wal);
|
||||
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
|
||||
SortedSet<String> wals = walsByGroup.get(walPrefix);
|
||||
if (wals == null) {
|
||||
wals = new TreeSet<String>();
|
||||
|
|
|
@ -38,9 +38,9 @@ import org.apache.hadoop.hbase.util.FSUtils;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class DefaultWALProvider extends AbstractFSWALProvider<FSHLog> {
|
||||
public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(DefaultWALProvider.class);
|
||||
private static final Log LOG = LogFactory.getLog(FSHLogProvider.class);
|
||||
|
||||
// Only public so classes back in regionserver.wal can access
|
||||
public interface Writer extends WALProvider.Writer {
|
|
@ -74,8 +74,8 @@ public class WALFactory {
|
|||
* Maps between configuration names for providers and implementation classes.
|
||||
*/
|
||||
static enum Providers {
|
||||
defaultProvider(DefaultWALProvider.class),
|
||||
filesystem(DefaultWALProvider.class),
|
||||
defaultProvider(FSHLogProvider.class),
|
||||
filesystem(FSHLogProvider.class),
|
||||
multiwal(RegionGroupingProvider.class),
|
||||
asyncfs(AsyncFSWALProvider.class);
|
||||
|
||||
|
@ -101,7 +101,7 @@ public class WALFactory {
|
|||
/**
|
||||
* Configuration-specified WAL Reader used when a custom reader is requested
|
||||
*/
|
||||
private final Class<? extends DefaultWALProvider.Reader> logReaderClass;
|
||||
private final Class<? extends AbstractFSWALProvider.Reader> logReaderClass;
|
||||
|
||||
/**
|
||||
* How long to attempt opening in-recovery wals
|
||||
|
@ -118,7 +118,7 @@ public class WALFactory {
|
|||
timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
|
||||
/* TODO Both of these are probably specific to the fs wal provider */
|
||||
logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
|
||||
DefaultWALProvider.Reader.class);
|
||||
AbstractFSWALProvider.Reader.class);
|
||||
this.conf = conf;
|
||||
// end required early initialization
|
||||
|
||||
|
@ -127,14 +127,15 @@ public class WALFactory {
|
|||
factoryId = SINGLETON_ID;
|
||||
}
|
||||
|
||||
Class<? extends WALProvider> getProviderClass(String key, String defaultValue) {
|
||||
@VisibleForTesting
|
||||
public Class<? extends WALProvider> getProviderClass(String key, String defaultValue) {
|
||||
try {
|
||||
return Providers.valueOf(conf.get(key, defaultValue)).clazz;
|
||||
} catch (IllegalArgumentException exception) {
|
||||
// Fall back to them specifying a class name
|
||||
// Note that the passed default class shouldn't actually be used, since the above only fails
|
||||
// when there is a config value present.
|
||||
return conf.getClass(key, DefaultWALProvider.class, WALProvider.class);
|
||||
return conf.getClass(key, Providers.defaultProvider.clazz, WALProvider.class);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -180,7 +181,7 @@ public class WALFactory {
|
|||
timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
|
||||
/* TODO Both of these are probably specific to the fs wal provider */
|
||||
logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
|
||||
DefaultWALProvider.Reader.class);
|
||||
AbstractFSWALProvider.Reader.class);
|
||||
this.conf = conf;
|
||||
this.factoryId = factoryId;
|
||||
// end required early initialization
|
||||
|
@ -248,7 +249,7 @@ public class WALFactory {
|
|||
if (null == metaProvider) {
|
||||
final WALProvider temp = getProvider(META_WAL_PROVIDER, DEFAULT_META_WAL_PROVIDER,
|
||||
Collections.<WALActionsListener>singletonList(new MetricsWAL()),
|
||||
DefaultWALProvider.META_WAL_PROVIDER_ID);
|
||||
AbstractFSWALProvider.META_WAL_PROVIDER_ID);
|
||||
if (this.metaProvider.compareAndSet(null, temp)) {
|
||||
metaProvider = temp;
|
||||
} else {
|
||||
|
@ -279,7 +280,7 @@ public class WALFactory {
|
|||
public Reader createReader(final FileSystem fs, final Path path,
|
||||
CancelableProgressable reporter, boolean allowCustom)
|
||||
throws IOException {
|
||||
Class<? extends DefaultWALProvider.Reader> lrClass =
|
||||
Class<? extends AbstractFSWALProvider.Reader> lrClass =
|
||||
allowCustom ? logReaderClass : ProtobufLogReader.class;
|
||||
|
||||
try {
|
||||
|
@ -294,7 +295,7 @@ public class WALFactory {
|
|||
try {
|
||||
if (lrClass != ProtobufLogReader.class) {
|
||||
// User is overriding the WAL reader, let them.
|
||||
DefaultWALProvider.Reader reader = lrClass.newInstance();
|
||||
AbstractFSWALProvider.Reader reader = lrClass.newInstance();
|
||||
reader.init(fs, path, conf, null);
|
||||
return reader;
|
||||
} else {
|
||||
|
@ -306,7 +307,7 @@ public class WALFactory {
|
|||
byte[] magic = new byte[ProtobufLogReader.PB_WAL_MAGIC.length];
|
||||
boolean isPbWal = (stream.read(magic) == magic.length)
|
||||
&& Arrays.equals(magic, ProtobufLogReader.PB_WAL_MAGIC);
|
||||
DefaultWALProvider.Reader reader =
|
||||
AbstractFSWALProvider.Reader reader =
|
||||
isPbWal ? new ProtobufLogReader() : new SequenceFileLogReader();
|
||||
reader.init(fs, path, conf, stream);
|
||||
return reader;
|
||||
|
@ -366,7 +367,7 @@ public class WALFactory {
|
|||
* @throws IOException
|
||||
*/
|
||||
public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException {
|
||||
return DefaultWALProvider.createWriter(conf, fs, path, false);
|
||||
return FSHLogProvider.createWriter(conf, fs, path, false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -376,7 +377,7 @@ public class WALFactory {
|
|||
@VisibleForTesting
|
||||
public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path)
|
||||
throws IOException {
|
||||
return DefaultWALProvider.createWriter(conf, fs, path, true);
|
||||
return FSHLogProvider.createWriter(conf, fs, path, true);
|
||||
}
|
||||
|
||||
// These static methods are currently used where it's impractical to
|
||||
|
@ -444,7 +445,7 @@ public class WALFactory {
|
|||
static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path,
|
||||
final Configuration configuration)
|
||||
throws IOException {
|
||||
return DefaultWALProvider.createWriter(configuration, fs, path, true);
|
||||
return FSHLogProvider.createWriter(configuration, fs, path, true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -455,7 +456,7 @@ public class WALFactory {
|
|||
public static Writer createWALWriter(final FileSystem fs, final Path path,
|
||||
final Configuration configuration)
|
||||
throws IOException {
|
||||
return DefaultWALProvider.createWriter(configuration, fs, path, false);
|
||||
return FSHLogProvider.createWriter(configuration, fs, path, false);
|
||||
}
|
||||
|
||||
public final WALProvider getWALProvider() {
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
|||
* The Write Ahead Log (WAL) stores all durable edits to the HRegion.
|
||||
* This interface provides the entry point for all WAL implementors.
|
||||
* <p>
|
||||
* See {@link DefaultWALProvider} for an example implementation.
|
||||
* See {@link FSHLogProvider} for an example implementation.
|
||||
*
|
||||
* A single WALProvider will be used for retrieving multiple WALs in a particular region server
|
||||
* and must be threadsafe.
|
||||
|
|
|
@ -18,6 +18,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import com.google.protobuf.TextFormat;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
@ -99,8 +105,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.LastSequenceId;
|
||||
// imports for things that haven't moved from regionserver.wal yet.
|
||||
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
|
@ -119,12 +124,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
|||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import com.google.protobuf.TextFormat;
|
||||
|
||||
/**
|
||||
* This class is responsible for splitting up a bunch of regionserver commit log
|
||||
* files that are no longer being written to, into new files, one per region for
|
||||
|
@ -318,7 +317,7 @@ public class WALSplitter {
|
|||
outputSinkStarted = true;
|
||||
Entry entry;
|
||||
Long lastFlushedSequenceId = -1L;
|
||||
ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logPath);
|
||||
ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(logPath);
|
||||
failedServerName = (serverName == null) ? "" : serverName.getServerName();
|
||||
while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
|
||||
byte[] region = entry.getKey().getEncodedRegionName();
|
||||
|
@ -500,7 +499,7 @@ public class WALSplitter {
|
|||
}
|
||||
|
||||
for (Path p : processedLogs) {
|
||||
Path newPath = FSHLog.getWALArchivePath(oldLogDir, p);
|
||||
Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p);
|
||||
if (fs.exists(p)) {
|
||||
if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
|
||||
LOG.warn("Unable to move " + p + " to " + newPath);
|
||||
|
|
|
@ -26,14 +26,12 @@ import org.apache.hadoop.hbase.client.Table;
|
|||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({MiscTests.class, LargeTests.class})
|
||||
@Category({ MiscTests.class, LargeTests.class })
|
||||
public class TestFullLogReconstruction {
|
||||
|
||||
private final static HBaseTestingUtility
|
||||
|
@ -68,20 +66,6 @@ public class TestFullLogReconstruction {
|
|||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the whole reconstruction loop. Build a table with regions aaa to zzz
|
||||
* and load every one of them multiple times with the same date and do a flush
|
||||
|
|
|
@ -58,7 +58,8 @@ import org.apache.hadoop.hbase.testclassification.ClientTests;
|
|||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.FSHLogProvider;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
@ -543,14 +544,14 @@ public class TestAdmin2 {
|
|||
byte[] value = Bytes.toBytes(v.toString());
|
||||
HRegionServer regionServer = startAndWriteData(TableName.valueOf("TestLogRolling"), value);
|
||||
LOG.info("after writing there are "
|
||||
+ DefaultWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)) + " log files");
|
||||
+ AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)) + " log files");
|
||||
|
||||
// flush all regions
|
||||
for (Region r : regionServer.getOnlineRegionsLocalContext()) {
|
||||
r.flush(true);
|
||||
}
|
||||
admin.rollWALWriter(regionServer.getServerName());
|
||||
int count = DefaultWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
|
||||
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
|
||||
LOG.info("after flushing all regions and rolling logs there are " +
|
||||
count + " log files");
|
||||
assertTrue(("actual count: " + count), count <= 2);
|
||||
|
|
|
@ -61,7 +61,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
|
@ -138,7 +138,7 @@ public class TestWALObserver {
|
|||
this.oldLogDir = new Path(this.hbaseRootDir,
|
||||
HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
this.logDir = new Path(this.hbaseRootDir,
|
||||
DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName()));
|
||||
AbstractFSWALProvider.getWALDirectoryName(currentTest.getMethodName()));
|
||||
this.logName = HConstants.HREGION_LOGDIR_NAME;
|
||||
|
||||
if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
|
||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
|
@ -49,10 +48,10 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
|
|||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
|
@ -63,6 +62,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -462,7 +462,7 @@ public class TestBlockReorder {
|
|||
|
||||
// Check that it will be possible to extract a ServerName from our construction
|
||||
Assert.assertNotNull("log= " + pseudoLogFile,
|
||||
DefaultWALProvider.getServerNameFromWALDirectoryName(dfs.getConf(), pseudoLogFile));
|
||||
AbstractFSWALProvider.getServerNameFromWALDirectoryName(dfs.getConf(), pseudoLogFile));
|
||||
|
||||
// And check we're doing the right reorder.
|
||||
lrb.reorderBlocks(conf, l, pseudoLogFile);
|
||||
|
|
|
@ -100,7 +100,8 @@ import org.apache.hadoop.hbase.util.FSUtils;
|
|||
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.FSHLogProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
|
@ -240,7 +241,7 @@ public class TestDistributedLogSplitting {
|
|||
}
|
||||
if (foundRs) break;
|
||||
}
|
||||
final Path logDir = new Path(rootdir, DefaultWALProvider.getWALDirectoryName(hrs
|
||||
final Path logDir = new Path(rootdir, AbstractFSWALProvider.getWALDirectoryName(hrs
|
||||
.getServerName().toString()));
|
||||
|
||||
LOG.info("#regions = " + regions.size());
|
||||
|
@ -1010,7 +1011,7 @@ public class TestDistributedLogSplitting {
|
|||
HRegionServer hrs = findRSToKill(false, "table");
|
||||
Path rootdir = FSUtils.getRootDir(conf);
|
||||
final Path logDir = new Path(rootdir,
|
||||
DefaultWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
|
||||
AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
|
||||
|
||||
Table t = installTable(new ZooKeeperWatcher(conf, "table-creation", null),
|
||||
"table", "family", 40);
|
||||
|
|
|
@ -37,9 +37,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
|
|||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
|
@ -50,10 +47,13 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
|
@ -167,7 +167,7 @@ public class TestCacheOnWriteInSchema {
|
|||
// Create a store based on the schema
|
||||
final String id = TestCacheOnWriteInSchema.class.getName();
|
||||
final Path logdir = new Path(FSUtils.getRootDir(conf),
|
||||
DefaultWALProvider.getWALDirectoryName(id));
|
||||
AbstractFSWALProvider.getWALDirectoryName(id));
|
||||
fs.delete(logdir, true);
|
||||
|
||||
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
|
||||
|
|
|
@ -183,7 +183,8 @@ public class TestFSErrorsExposed {
|
|||
try {
|
||||
// Make it fail faster.
|
||||
util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
|
||||
|
||||
util.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000);
|
||||
util.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000);
|
||||
util.startMiniCluster(1);
|
||||
TableName tableName = TableName.valueOf("table");
|
||||
byte[] fam = Bytes.toBytes("fam");
|
||||
|
@ -276,7 +277,4 @@ public class TestFSErrorsExposed {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -18,10 +18,58 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
|
||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR;
|
||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR;
|
||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
|
||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
|
||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
|
||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.commons.lang.RandomStringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -30,6 +78,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ArrayBackedTag;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
||||
|
@ -52,7 +101,6 @@ import org.apache.hadoop.hbase.NotServingRegionException;
|
|||
import org.apache.hadoop.hbase.RegionTooBusyException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.ArrayBackedTag;
|
||||
import org.apache.hadoop.hbase.TagType;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
|
@ -111,7 +159,7 @@ import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
|||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.FaultyFSLog;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
|
@ -132,53 +180,6 @@ import org.mockito.Mockito;
|
|||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
|
||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR;
|
||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR;
|
||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
|
||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
|
||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
|
||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Basic stand-alone testing of HRegion. No clusters!
|
||||
*
|
||||
|
@ -984,7 +985,7 @@ public class TestHRegion {
|
|||
|
||||
// now verify that the flush markers are written
|
||||
wal.shutdown();
|
||||
WAL.Reader reader = WALFactory.createReader(fs, DefaultWALProvider.getCurrentFileName(wal),
|
||||
WAL.Reader reader = WALFactory.createReader(fs, AbstractFSWALProvider.getCurrentFileName(wal),
|
||||
TEST_UTIL.getConfiguration());
|
||||
try {
|
||||
List<WAL.Entry> flushDescriptors = new ArrayList<WAL.Entry>();
|
||||
|
@ -4782,6 +4783,9 @@ public class TestHRegion {
|
|||
Path logDir = new Path(new Path(dir + method), "log");
|
||||
final Configuration walConf = new Configuration(conf);
|
||||
FSUtils.setRootDir(walConf, logDir);
|
||||
// XXX: The spied AsyncFSWAL can not work properly because of a Mockito defect that can not
|
||||
// deal with classes which have a field of an inner class. See discussions in HBASE-15536.
|
||||
walConf.set(WALFactory.WAL_PROVIDER, "filesystem");
|
||||
final WALFactory wals = new WALFactory(walConf, null, UUID.randomUUID().toString());
|
||||
final WAL wal = spy(wals.getWAL(tableName.getName(), tableName.getNamespace()));
|
||||
this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
|
||||
|
@ -4820,6 +4824,7 @@ public class TestHRegion {
|
|||
}
|
||||
|
||||
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
||||
wals.close();
|
||||
this.region = null;
|
||||
}
|
||||
|
||||
|
|
|
@ -35,6 +35,9 @@ import static org.mockito.Mockito.times;
|
|||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -84,7 +87,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
|
@ -97,9 +100,6 @@ import org.junit.Test;
|
|||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
/**
|
||||
* Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
|
||||
* region replicas
|
||||
|
@ -304,7 +304,7 @@ public class TestHRegionReplayEvents {
|
|||
|
||||
WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException {
|
||||
return WALFactory.createReader(TEST_UTIL.getTestFileSystem(),
|
||||
DefaultWALProvider.getCurrentFileName(walPrimary),
|
||||
AbstractFSWALProvider.getCurrentFileName(walPrimary),
|
||||
TEST_UTIL.getConfiguration());
|
||||
}
|
||||
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -79,8 +80,9 @@ public class TestMobStoreScanner {
|
|||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
|
||||
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.client.keyvalue.maxsize", 100*1024*1024);
|
||||
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.client.keyvalue.maxsize", 100 * 1024 * 1024);
|
||||
// TODO: AsyncFSWAL can not handle large edits right now, remove this after we fix the issue.
|
||||
TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, "filesystem");
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
}
|
||||
|
||||
|
|
|
@ -48,12 +48,12 @@ import org.apache.hadoop.hbase.client.Get;
|
|||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
@ -436,7 +436,7 @@ public class TestPerColumnFamilyFlush {
|
|||
}
|
||||
|
||||
private int getNumRolledLogFiles(Region region) {
|
||||
return ((FSHLog)getWAL(region)).getNumRolledLogFiles();
|
||||
return AbstractFSWALProvider.getNumRolledLogFiles(getWAL(region));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -28,6 +28,7 @@ import static org.mockito.Mockito.times;
|
|||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.ref.SoftReference;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
@ -60,8 +61,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
|
@ -74,15 +73,17 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
|||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -170,7 +171,7 @@ public class TestStore {
|
|||
//Setting up a Store
|
||||
Path basedir = new Path(DIR+methodName);
|
||||
Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
|
||||
final Path logdir = new Path(basedir, DefaultWALProvider.getWALDirectoryName(methodName));
|
||||
final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
|
||||
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
|
||||
|
|
|
@ -60,7 +60,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
|||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -194,7 +194,7 @@ public abstract class AbstractTestFSWAL {
|
|||
assertTrue(comp.compare(p1, p2) < 0);
|
||||
walMeta =
|
||||
newWAL(FS, FSUtils.getRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME,
|
||||
CONF, null, true, null, DefaultWALProvider.META_WAL_PROVIDER_ID);
|
||||
CONF, null, true, null, AbstractFSWALProvider.META_WAL_PROVIDER_ID);
|
||||
Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR;
|
||||
|
||||
Path p1WithMeta = walMeta.computeFilename(11);
|
||||
|
|
|
@ -0,0 +1,156 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Tests that verifies that the log is forced to be rolled every "hbase.regionserver.logroll.period"
|
||||
*/
|
||||
public abstract class AbstractTestLogRollPeriod {
|
||||
private static final Log LOG = LogFactory.getLog(AbstractTestLogRollPeriod.class);
|
||||
|
||||
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private final static long LOG_ROLL_PERIOD = 4000;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
// disable the ui
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.regionsever.info.port", -1);
|
||||
|
||||
TEST_UTIL.getConfiguration().setLong("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD);
|
||||
|
||||
TEST_UTIL.startMiniCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that the LogRoller perform the roll even if there are no edits
|
||||
*/
|
||||
@Test
|
||||
public void testNoEdits() throws Exception {
|
||||
TableName tableName = TableName.valueOf("TestLogRollPeriodNoEdits");
|
||||
TEST_UTIL.createTable(tableName, "cf");
|
||||
try {
|
||||
Table table = TEST_UTIL.getConnection().getTable(tableName);
|
||||
try {
|
||||
HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(tableName);
|
||||
WAL log = server.getWAL(null);
|
||||
checkMinLogRolls(log, 5);
|
||||
} finally {
|
||||
table.close();
|
||||
}
|
||||
} finally {
|
||||
TEST_UTIL.deleteTable(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that the LogRoller perform the roll with some data in the log
|
||||
*/
|
||||
@Test(timeout=60000)
|
||||
public void testWithEdits() throws Exception {
|
||||
final TableName tableName = TableName.valueOf("TestLogRollPeriodWithEdits");
|
||||
final String family = "cf";
|
||||
|
||||
TEST_UTIL.createTable(tableName, family);
|
||||
try {
|
||||
HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(tableName);
|
||||
WAL log = server.getWAL(null);
|
||||
final Table table = TEST_UTIL.getConnection().getTable(tableName);
|
||||
|
||||
Thread writerThread = new Thread("writer") {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
long row = 0;
|
||||
while (!interrupted()) {
|
||||
Put p = new Put(Bytes.toBytes(String.format("row%d", row)));
|
||||
p.addColumn(Bytes.toBytes(family), Bytes.toBytes("col"), Bytes.toBytes(row));
|
||||
table.put(p);
|
||||
row++;
|
||||
|
||||
Thread.sleep(LOG_ROLL_PERIOD / 16);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
writerThread.start();
|
||||
checkMinLogRolls(log, 5);
|
||||
} finally {
|
||||
writerThread.interrupt();
|
||||
writerThread.join();
|
||||
table.close();
|
||||
}
|
||||
} finally {
|
||||
TEST_UTIL.deleteTable(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
private void checkMinLogRolls(final WAL log, final int minRolls)
|
||||
throws Exception {
|
||||
final List<Path> paths = new ArrayList<Path>();
|
||||
log.registerWALActionsListener(new WALActionsListener.Base() {
|
||||
@Override
|
||||
public void postLogRoll(Path oldFile, Path newFile) {
|
||||
LOG.debug("postLogRoll: oldFile="+oldFile+" newFile="+newFile);
|
||||
paths.add(newFile);
|
||||
}
|
||||
});
|
||||
|
||||
// Sleep until we should get at least min-LogRoll events
|
||||
long wtime = System.currentTimeMillis();
|
||||
Thread.sleep((minRolls + 1) * LOG_ROLL_PERIOD);
|
||||
// Do some extra sleep in case the machine is slow,
|
||||
// and the log-roll is not triggered exactly on LOG_ROLL_PERIOD.
|
||||
final int NUM_RETRIES = 1 + 8 * (minRolls - paths.size());
|
||||
for (int retry = 0; paths.size() < minRolls && retry < NUM_RETRIES; ++retry) {
|
||||
Thread.sleep(LOG_ROLL_PERIOD / 4);
|
||||
}
|
||||
wtime = System.currentTimeMillis() - wtime;
|
||||
LOG.info(String.format("got %d rolls after %dms (%dms each) - expected at least %d rolls",
|
||||
paths.size(), wtime, wtime / paths.size(), minRolls));
|
||||
assertFalse(paths.size() < minRolls);
|
||||
}
|
||||
}
|
|
@ -45,7 +45,8 @@ import org.apache.hadoop.hbase.regionserver.Region;
|
|||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.FSHLogProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
|
@ -184,7 +185,7 @@ public abstract class AbstractTestLogRolling {
|
|||
HRegionInfo region =
|
||||
server.getOnlineRegions(TableName.valueOf(tableName)).get(0).getRegionInfo();
|
||||
final WAL log = server.getWAL(region);
|
||||
LOG.info("after writing there are " + DefaultWALProvider.getNumRolledLogFiles(log) +
|
||||
LOG.info("after writing there are " + AbstractFSWALProvider.getNumRolledLogFiles(log) +
|
||||
" log files");
|
||||
|
||||
// flush all regions
|
||||
|
@ -195,7 +196,7 @@ public abstract class AbstractTestLogRolling {
|
|||
// Now roll the log
|
||||
log.rollWriter();
|
||||
|
||||
int count = DefaultWALProvider.getNumRolledLogFiles(log);
|
||||
int count = AbstractFSWALProvider.getNumRolledLogFiles(log);
|
||||
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
|
||||
assertTrue(("actual count: " + count), count <= 2);
|
||||
}
|
||||
|
@ -226,34 +227,6 @@ public abstract class AbstractTestLogRolling {
|
|||
LOG.info("Validated row " + row);
|
||||
}
|
||||
|
||||
void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout)
|
||||
throws IOException {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Put put = new Put(Bytes.toBytes("row"
|
||||
+ String.format("%1$04d", (start + i))));
|
||||
put.addColumn(HConstants.CATALOG_FAMILY, null, value);
|
||||
table.put(put);
|
||||
}
|
||||
Put tmpPut = new Put(Bytes.toBytes("tmprow"));
|
||||
tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value);
|
||||
long startTime = System.currentTimeMillis();
|
||||
long remaining = timeout;
|
||||
while (remaining > 0) {
|
||||
if (log.isLowReplicationRollEnabled() == expect) {
|
||||
break;
|
||||
} else {
|
||||
// Trigger calling FSHlog#checkLowReplication()
|
||||
table.put(tmpPut);
|
||||
try {
|
||||
Thread.sleep(200);
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
remaining = timeout - (System.currentTimeMillis() - startTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that logs are deleted when some region has a compaction
|
||||
* record in WAL and no other records. See HBASE-8597.
|
||||
|
@ -282,13 +255,13 @@ public abstract class AbstractTestLogRolling {
|
|||
}
|
||||
doPut(table, 3); // don't flush yet, or compaction might trigger before we roll WAL
|
||||
assertEquals("Should have no WAL after initial writes", 0,
|
||||
DefaultWALProvider.getNumRolledLogFiles(log));
|
||||
AbstractFSWALProvider.getNumRolledLogFiles(log));
|
||||
assertEquals(2, s.getStorefilesCount());
|
||||
|
||||
// Roll the log and compact table, to have compaction record in the 2nd WAL.
|
||||
log.rollWriter();
|
||||
assertEquals("Should have WAL; one table is not flushed", 1,
|
||||
DefaultWALProvider.getNumRolledLogFiles(log));
|
||||
AbstractFSWALProvider.getNumRolledLogFiles(log));
|
||||
admin.flush(table.getName());
|
||||
region.compact(false);
|
||||
// Wait for compaction in case if flush triggered it before us.
|
||||
|
@ -302,14 +275,14 @@ public abstract class AbstractTestLogRolling {
|
|||
doPut(table, 0); // Now 2nd WAL will have both compaction and put record for table.
|
||||
log.rollWriter(); // 1st WAL deleted, 2nd not deleted yet.
|
||||
assertEquals("Should have WAL; one table is not flushed", 1,
|
||||
DefaultWALProvider.getNumRolledLogFiles(log));
|
||||
AbstractFSWALProvider.getNumRolledLogFiles(log));
|
||||
|
||||
// Flush table to make latest WAL obsolete; write another record, and roll again.
|
||||
admin.flush(table.getName());
|
||||
doPut(table, 1);
|
||||
log.rollWriter(); // Now 2nd WAL is deleted and 3rd is added.
|
||||
assertEquals("Should have 1 WALs at the end", 1,
|
||||
DefaultWALProvider.getNumRolledLogFiles(log));
|
||||
AbstractFSWALProvider.getNumRolledLogFiles(log));
|
||||
} finally {
|
||||
if (t != null) t.close();
|
||||
if (table != null) table.close();
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -35,7 +35,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.FSHLogProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.SequenceFile.CompressionType;
|
||||
|
@ -54,7 +54,7 @@ import org.apache.hadoop.io.compress.DefaultCodec;
|
|||
* Hadoop serialization).
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||
public class SequenceFileLogWriter implements DefaultWALProvider.Writer {
|
||||
public class SequenceFileLogWriter implements FSHLogProvider.Writer {
|
||||
private static final Log LOG = LogFactory.getLog(SequenceFileLogWriter.class);
|
||||
// The sequence file we delegate to.
|
||||
private SequenceFile.Writer writer;
|
||||
|
|
|
@ -25,12 +25,12 @@ import org.junit.BeforeClass;
|
|||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ RegionServerTests.class, MediumTests.class })
|
||||
public class TestAsyncLogRollPeriod extends TestLogRollPeriod {
|
||||
public class TestAsyncLogRollPeriod extends AbstractTestLogRollPeriod {
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TestLogRollPeriod.TEST_UTIL.getConfiguration();
|
||||
Configuration conf = AbstractTestLogRollPeriod.TEST_UTIL.getConfiguration();
|
||||
conf.set(WALFactory.WAL_PROVIDER, "asyncfs");
|
||||
TestLogRollPeriod.setUpBeforeClass();
|
||||
AbstractTestLogRollPeriod.setUpBeforeClass();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,20 +17,45 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ RegionServerTests.class, MediumTests.class })
|
||||
public class TestAsyncWALReplay extends TestWALReplay {
|
||||
public class TestAsyncWALReplay extends AbstractTestWALReplay {
|
||||
|
||||
private static EventLoopGroup GROUP;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration();
|
||||
GROUP = new NioEventLoopGroup();
|
||||
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
|
||||
conf.set(WALFactory.WAL_PROVIDER, "asyncfs");
|
||||
TestWALReplay.setUpBeforeClass();
|
||||
AbstractTestWALReplay.setUpBeforeClass();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
AbstractTestWALReplay.tearDownAfterClass();
|
||||
GROUP.shutdownGracefully();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException {
|
||||
return new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName,
|
||||
HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP.next());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,18 +21,16 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ RegionServerTests.class, MediumTests.class })
|
||||
public class TestAsyncWALReplayCompressed extends TestWALReplay {
|
||||
public class TestAsyncWALReplayCompressed extends TestAsyncWALReplay {
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration();
|
||||
conf.set(WALFactory.WAL_PROVIDER, "asyncfs");
|
||||
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
|
||||
conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
|
||||
TestWALReplay.setUpBeforeClass();
|
||||
TestAsyncWALReplay.setUpBeforeClass();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
|
|||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
|
@ -251,7 +251,7 @@ public class TestDurability {
|
|||
}
|
||||
|
||||
private void verifyWALCount(WALFactory wals, WAL log, int expected) throws Exception {
|
||||
Path walPath = DefaultWALProvider.getCurrentFileName(log);
|
||||
Path walPath = AbstractFSWALProvider.getCurrentFileName(log);
|
||||
WAL.Reader reader = wals.createReader(FS, walPath);
|
||||
int count = 0;
|
||||
WAL.Entry entry = new WAL.Entry();
|
||||
|
|
|
@ -17,18 +17,13 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.junit.Assert;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -44,16 +39,22 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.FSHLogProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
@ -96,6 +97,7 @@ public class TestLogRollAbort {
|
|||
// the namenode might still try to choose the recently-dead datanode
|
||||
// for a pipeline, so try to a new pipeline multiple times
|
||||
TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 10);
|
||||
TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, "filesystem");
|
||||
}
|
||||
|
||||
private Configuration conf;
|
||||
|
@ -183,7 +185,7 @@ public class TestLogRollAbort {
|
|||
public void testLogRollAfterSplitStart() throws IOException {
|
||||
LOG.info("Verify wal roll after split starts will fail.");
|
||||
String logName = "testLogRollAfterSplitStart";
|
||||
Path thisTestsDir = new Path(HBASEDIR, DefaultWALProvider.getWALDirectoryName(logName));
|
||||
Path thisTestsDir = new Path(HBASEDIR, AbstractFSWALProvider.getWALDirectoryName(logName));
|
||||
final WALFactory wals = new WALFactory(conf, null, logName);
|
||||
|
||||
try {
|
||||
|
@ -218,7 +220,7 @@ public class TestLogRollAbort {
|
|||
* handles RS shutdowns (as observed by the splitting process)
|
||||
*/
|
||||
// rename the directory so a rogue RS doesn't create more WALs
|
||||
Path rsSplitDir = thisTestsDir.suffix(DefaultWALProvider.SPLITTING_EXT);
|
||||
Path rsSplitDir = thisTestsDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
|
||||
if (!fs.rename(thisTestsDir, rsSplitDir)) {
|
||||
throw new IOException("Failed fs.rename for log split: " + thisTestsDir);
|
||||
}
|
||||
|
|
|
@ -17,144 +17,20 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.junit.AfterClass;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Tests that verifies that the log is forced to be rolled every "hbase.regionserver.logroll.period"
|
||||
*/
|
||||
@Category({RegionServerTests.class, MediumTests.class})
|
||||
public class TestLogRollPeriod {
|
||||
private static final Log LOG = LogFactory.getLog(AbstractTestLogRolling.class);
|
||||
|
||||
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private final static long LOG_ROLL_PERIOD = 4000;
|
||||
@Category({ RegionServerTests.class, MediumTests.class })
|
||||
public class TestLogRollPeriod extends AbstractTestLogRollPeriod {
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
// disable the ui
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.regionsever.info.port", -1);
|
||||
|
||||
TEST_UTIL.getConfiguration().setLong("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD);
|
||||
|
||||
TEST_UTIL.startMiniCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that the LogRoller perform the roll even if there are no edits
|
||||
*/
|
||||
@Test
|
||||
public void testNoEdits() throws Exception {
|
||||
TableName tableName = TableName.valueOf("TestLogRollPeriodNoEdits");
|
||||
TEST_UTIL.createTable(tableName, "cf");
|
||||
try {
|
||||
Table table = TEST_UTIL.getConnection().getTable(tableName);
|
||||
try {
|
||||
HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(tableName);
|
||||
WAL log = server.getWAL(null);
|
||||
checkMinLogRolls(log, 5);
|
||||
} finally {
|
||||
table.close();
|
||||
}
|
||||
} finally {
|
||||
TEST_UTIL.deleteTable(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that the LogRoller perform the roll with some data in the log
|
||||
*/
|
||||
@Test(timeout=60000)
|
||||
public void testWithEdits() throws Exception {
|
||||
final TableName tableName = TableName.valueOf("TestLogRollPeriodWithEdits");
|
||||
final String family = "cf";
|
||||
|
||||
TEST_UTIL.createTable(tableName, family);
|
||||
try {
|
||||
HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(tableName);
|
||||
WAL log = server.getWAL(null);
|
||||
final Table table = TEST_UTIL.getConnection().getTable(tableName);
|
||||
|
||||
Thread writerThread = new Thread("writer") {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
long row = 0;
|
||||
while (!interrupted()) {
|
||||
Put p = new Put(Bytes.toBytes(String.format("row%d", row)));
|
||||
p.addColumn(Bytes.toBytes(family), Bytes.toBytes("col"), Bytes.toBytes(row));
|
||||
table.put(p);
|
||||
row++;
|
||||
|
||||
Thread.sleep(LOG_ROLL_PERIOD / 16);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
writerThread.start();
|
||||
checkMinLogRolls(log, 5);
|
||||
} finally {
|
||||
writerThread.interrupt();
|
||||
writerThread.join();
|
||||
table.close();
|
||||
}
|
||||
} finally {
|
||||
TEST_UTIL.deleteTable(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
private void checkMinLogRolls(final WAL log, final int minRolls)
|
||||
throws Exception {
|
||||
final List<Path> paths = new ArrayList<Path>();
|
||||
log.registerWALActionsListener(new WALActionsListener.Base() {
|
||||
@Override
|
||||
public void postLogRoll(Path oldFile, Path newFile) {
|
||||
LOG.debug("postLogRoll: oldFile="+oldFile+" newFile="+newFile);
|
||||
paths.add(newFile);
|
||||
}
|
||||
});
|
||||
|
||||
// Sleep until we should get at least min-LogRoll events
|
||||
long wtime = System.currentTimeMillis();
|
||||
Thread.sleep((minRolls + 1) * LOG_ROLL_PERIOD);
|
||||
// Do some extra sleep in case the machine is slow,
|
||||
// and the log-roll is not triggered exactly on LOG_ROLL_PERIOD.
|
||||
final int NUM_RETRIES = 1 + 8 * (minRolls - paths.size());
|
||||
for (int retry = 0; paths.size() < minRolls && retry < NUM_RETRIES; ++retry) {
|
||||
Thread.sleep(LOG_ROLL_PERIOD / 4);
|
||||
}
|
||||
wtime = System.currentTimeMillis() - wtime;
|
||||
LOG.info(String.format("got %d rolls after %dms (%dms each) - expected at least %d rolls",
|
||||
paths.size(), wtime, wtime / paths.size(), minRolls));
|
||||
assertFalse(paths.size() < minRolls);
|
||||
Configuration conf = AbstractTestLogRollPeriod.TEST_UTIL.getConfiguration();
|
||||
conf.set(WALFactory.WAL_PROVIDER, "filesystem");
|
||||
AbstractTestLogRollPeriod.setUpBeforeClass();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
|
@ -50,7 +52,8 @@ import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.FSHLogProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
|
@ -82,9 +85,37 @@ public class TestLogRolling extends AbstractTestLogRolling {
|
|||
TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2);
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
|
||||
TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, "filesystem");
|
||||
AbstractTestLogRolling.setUpBeforeClass();
|
||||
}
|
||||
|
||||
void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout)
|
||||
throws IOException {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", (start + i))));
|
||||
put.addColumn(HConstants.CATALOG_FAMILY, null, value);
|
||||
table.put(put);
|
||||
}
|
||||
Put tmpPut = new Put(Bytes.toBytes("tmprow"));
|
||||
tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value);
|
||||
long startTime = System.currentTimeMillis();
|
||||
long remaining = timeout;
|
||||
while (remaining > 0) {
|
||||
if (log.isLowReplicationRollEnabled() == expect) {
|
||||
break;
|
||||
} else {
|
||||
// Trigger calling FSHlog#checkLowReplication()
|
||||
table.put(tmpPut);
|
||||
try {
|
||||
Thread.sleep(200);
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
remaining = timeout - (System.currentTimeMillis() - startTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that logs are rolled upon detecting datanode death Requires an HDFS jar with HDFS-826 &
|
||||
* syncFs() support (HDFS-200)
|
||||
|
@ -148,12 +179,12 @@ public class TestLogRolling extends AbstractTestLogRolling {
|
|||
|
||||
long curTime = System.currentTimeMillis();
|
||||
LOG.info("log.getCurrentFileName(): " + log.getCurrentFileName());
|
||||
long oldFilenum = DefaultWALProvider.extractFileNumFromWAL(log);
|
||||
long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
|
||||
assertTrue("Log should have a timestamp older than now",
|
||||
curTime > oldFilenum && oldFilenum != -1);
|
||||
|
||||
assertTrue("The log shouldn't have rolled yet",
|
||||
oldFilenum == DefaultWALProvider.extractFileNumFromWAL(log));
|
||||
oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log));
|
||||
final DatanodeInfo[] pipeline = log.getPipeline();
|
||||
assertTrue(pipeline.length == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
|
||||
|
||||
|
@ -163,7 +194,7 @@ public class TestLogRolling extends AbstractTestLogRolling {
|
|||
|
||||
// this write should succeed, but trigger a log roll
|
||||
writeData(table, 2);
|
||||
long newFilenum = DefaultWALProvider.extractFileNumFromWAL(log);
|
||||
long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
|
||||
|
||||
assertTrue("Missing datanode should've triggered a log roll",
|
||||
newFilenum > oldFilenum && newFilenum > curTime);
|
||||
|
@ -174,7 +205,7 @@ public class TestLogRolling extends AbstractTestLogRolling {
|
|||
// write some more log data (this should use a new hdfs_out)
|
||||
writeData(table, 3);
|
||||
assertTrue("The log should not roll again.",
|
||||
DefaultWALProvider.extractFileNumFromWAL(log) == newFilenum);
|
||||
AbstractFSWALProvider.extractFileNumFromWAL(log) == newFilenum);
|
||||
// kill another datanode in the pipeline, so the replicas will be lower than
|
||||
// the configured value 2.
|
||||
assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null);
|
||||
|
@ -224,7 +255,7 @@ public class TestLogRolling extends AbstractTestLogRolling {
|
|||
final List<Path> paths = new ArrayList<Path>();
|
||||
final List<Integer> preLogRolledCalled = new ArrayList<Integer>();
|
||||
|
||||
paths.add(DefaultWALProvider.getCurrentFileName(log));
|
||||
paths.add(AbstractFSWALProvider.getCurrentFileName(log));
|
||||
log.registerWALActionsListener(new WALActionsListener.Base() {
|
||||
|
||||
@Override
|
||||
|
@ -246,13 +277,13 @@ public class TestLogRolling extends AbstractTestLogRolling {
|
|||
writeData(table, 1002);
|
||||
|
||||
long curTime = System.currentTimeMillis();
|
||||
LOG.info("log.getCurrentFileName()): " + DefaultWALProvider.getCurrentFileName(log));
|
||||
long oldFilenum = DefaultWALProvider.extractFileNumFromWAL(log);
|
||||
LOG.info("log.getCurrentFileName()): " + AbstractFSWALProvider.getCurrentFileName(log));
|
||||
long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
|
||||
assertTrue("Log should have a timestamp older than now",
|
||||
curTime > oldFilenum && oldFilenum != -1);
|
||||
|
||||
assertTrue("The log shouldn't have rolled yet",
|
||||
oldFilenum == DefaultWALProvider.extractFileNumFromWAL(log));
|
||||
oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log));
|
||||
|
||||
// roll all datanodes in the pipeline
|
||||
dfsCluster.restartDataNodes();
|
||||
|
@ -263,7 +294,7 @@ public class TestLogRolling extends AbstractTestLogRolling {
|
|||
|
||||
// this write should succeed, but trigger a log roll
|
||||
writeData(table, 1003);
|
||||
long newFilenum = DefaultWALProvider.extractFileNumFromWAL(log);
|
||||
long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
|
||||
|
||||
assertTrue("Missing datanode should've triggered a log roll",
|
||||
newFilenum > oldFilenum && newFilenum > curTime);
|
||||
|
|
|
@ -70,6 +70,7 @@ public class TestLogRollingNoCluster {
|
|||
// The implementation needs to know the 'handler' count.
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT);
|
||||
final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
conf.set(WALFactory.WAL_PROVIDER, "filesystem");
|
||||
FSUtils.setRootDir(conf, dir);
|
||||
final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName());
|
||||
final WAL wal = wals.getWAL(new byte[]{}, null);
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.FSHLogProvider;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
@ -33,7 +33,7 @@ public class TestProtobufLog extends AbstractTestProtobufLog<WALProvider.Writer>
|
|||
|
||||
@Override
|
||||
protected Writer createWriter(Path path) throws IOException {
|
||||
return DefaultWALProvider.createWriter(TEST_UTIL.getConfiguration(), fs, path, false);
|
||||
return FSHLogProvider.createWriter(TEST_UTIL.getConfiguration(), fs, path, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -32,7 +32,7 @@ public class TestSecureAsyncWALReplay extends TestAsyncWALReplay {
|
|||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration();
|
||||
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
|
||||
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
|
||||
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
||||
conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
|
||||
|
|
|
@ -19,21 +19,20 @@ package org.apache.hadoop.hbase.regionserver.wal;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Reader;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
|
||||
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({RegionServerTests.class, MediumTests.class})
|
||||
@Category({ RegionServerTests.class, MediumTests.class })
|
||||
public class TestSecureWALReplay extends TestWALReplay {
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration();
|
||||
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
|
||||
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
|
||||
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
||||
conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
|
||||
|
@ -41,7 +40,6 @@ public class TestSecureWALReplay extends TestWALReplay {
|
|||
conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
|
||||
Writer.class);
|
||||
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
|
||||
TestWALReplay.setUpBeforeClass();
|
||||
AbstractTestWALReplay.setUpBeforeClass();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -27,13 +27,13 @@ import org.junit.experimental.categories.Category;
|
|||
/**
|
||||
* Enables compression and runs the TestWALReplay tests.
|
||||
*/
|
||||
@Category({RegionServerTests.class, MediumTests.class})
|
||||
@Category({ RegionServerTests.class, MediumTests.class })
|
||||
public class TestWALReplayCompressed extends TestWALReplay {
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TestWALReplay.setUpBeforeClass();
|
||||
Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration();
|
||||
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
|
||||
conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
|
||||
TestWALReplay.setUpBeforeClass();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,36 +18,34 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.DEFAULT_PROVIDER_ID;
|
||||
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
|
||||
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
|
||||
import static org.apache.hadoop.hbase.wal.DefaultWALProvider.DEFAULT_PROVIDER_ID;
|
||||
import static org.apache.hadoop.hbase.wal.DefaultWALProvider.META_WAL_PROVIDER_ID;
|
||||
import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
|
||||
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
// imports for things that haven't moved from regionserver.wal yet.
|
||||
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
|
||||
/**
|
||||
* A WAL Provider that returns a single thread safe WAL that optionally can skip parts of our
|
||||
* normal interactions with HDFS.
|
||||
*
|
||||
* This implementation picks a directory in HDFS based on the same mechanisms as the
|
||||
* {@link DefaultWALProvider}. Users can configure how much interaction
|
||||
* {@link FSHLogProvider}. Users can configure how much interaction
|
||||
* we have with HDFS with the configuration property "hbase.wal.iotestprovider.operations".
|
||||
* The value should be a comma separated list of allowed operations:
|
||||
* <ul>
|
||||
|
@ -102,9 +100,9 @@ public class IOTestProvider implements WALProvider {
|
|||
}
|
||||
final String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId;
|
||||
log = new IOTestWAL(FileSystem.get(conf), FSUtils.getRootDir(conf),
|
||||
DefaultWALProvider.getWALDirectoryName(factory.factoryId),
|
||||
HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
|
||||
true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
|
||||
AbstractFSWALProvider.getWALDirectoryName(factory.factoryId),
|
||||
HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix,
|
||||
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -150,7 +148,7 @@ public class IOTestProvider implements WALProvider {
|
|||
* it will be URL encoded before being used.
|
||||
* If prefix is null, "wal" will be used
|
||||
* @param suffix will be url encoded. null is treated as empty. non-empty must start with
|
||||
* {@link DefaultWALProvider#WAL_FILE_NAME_DELIMITER}
|
||||
* {@link AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER}
|
||||
* @throws IOException
|
||||
*/
|
||||
public IOTestWAL(final FileSystem fs, final Path rootDir, final String logDir,
|
||||
|
|
|
@ -28,7 +28,7 @@ import org.junit.experimental.categories.Category;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||
|
||||
@Category({RegionServerTests.class, LargeTests.class})
|
||||
public class TestDefaultWALProviderWithHLogKey extends TestDefaultWALProvider {
|
||||
public class TestDefaultWALProviderWithHLogKey extends TestFSHLogProvider {
|
||||
@Override
|
||||
WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp,
|
||||
final NavigableMap<byte[], Integer> scopes) {
|
||||
|
|
|
@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.KeyValue;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
// imports for things that haven't moved from regionserver.wal yet.
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -59,12 +61,9 @@ import org.junit.Test;
|
|||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
// imports for things that haven't moved from regionserver.wal yet.
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
|
||||
@Category({RegionServerTests.class, MediumTests.class})
|
||||
public class TestDefaultWALProvider {
|
||||
private static final Log LOG = LogFactory.getLog(TestDefaultWALProvider.class);
|
||||
public class TestFSHLogProvider {
|
||||
private static final Log LOG = LogFactory.getLog(TestFSHLogProvider.class);
|
||||
|
||||
protected static Configuration conf;
|
||||
protected static FileSystem fs;
|
||||
|
@ -124,28 +123,28 @@ public class TestDefaultWALProvider {
|
|||
public void testGetServerNameFromWALDirectoryName() throws IOException {
|
||||
ServerName sn = ServerName.valueOf("hn", 450, 1398);
|
||||
String hl = FSUtils.getRootDir(conf) + "/" +
|
||||
DefaultWALProvider.getWALDirectoryName(sn.toString());
|
||||
AbstractFSWALProvider.getWALDirectoryName(sn.toString());
|
||||
|
||||
// Must not throw exception
|
||||
assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, null));
|
||||
assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf,
|
||||
assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, null));
|
||||
assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf,
|
||||
FSUtils.getRootDir(conf).toUri().toString()));
|
||||
assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, ""));
|
||||
assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, " "));
|
||||
assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, hl));
|
||||
assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, hl + "qdf"));
|
||||
assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, "sfqf" + hl + "qdf"));
|
||||
assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, ""));
|
||||
assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, " "));
|
||||
assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, hl));
|
||||
assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, hl + "qdf"));
|
||||
assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, "sfqf" + hl + "qdf"));
|
||||
|
||||
final String wals = "/WALs/";
|
||||
ServerName parsed = DefaultWALProvider.getServerNameFromWALDirectoryName(conf,
|
||||
ServerName parsed = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf,
|
||||
FSUtils.getRootDir(conf).toUri().toString() + wals + sn +
|
||||
"/localhost%2C32984%2C1343316388997.1343316390417");
|
||||
assertEquals("standard", sn, parsed);
|
||||
|
||||
parsed = DefaultWALProvider.getServerNameFromWALDirectoryName(conf, hl + "/qdf");
|
||||
parsed = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, hl + "/qdf");
|
||||
assertEquals("subdir", sn, parsed);
|
||||
|
||||
parsed = DefaultWALProvider.getServerNameFromWALDirectoryName(conf,
|
||||
parsed = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf,
|
||||
FSUtils.getRootDir(conf).toUri().toString() + wals + sn +
|
||||
"-splitting/localhost%3A57020.1340474893931");
|
||||
assertEquals("split", sn, parsed);
|
||||
|
@ -206,7 +205,7 @@ public class TestDefaultWALProvider {
|
|||
scopes2.put(fam, 0);
|
||||
}
|
||||
final Configuration localConf = new Configuration(conf);
|
||||
localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName());
|
||||
localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName());
|
||||
final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName());
|
||||
final AtomicLong sequenceId = new AtomicLong(1);
|
||||
try {
|
||||
|
@ -221,12 +220,12 @@ public class TestDefaultWALProvider {
|
|||
// Before HBASE-3198 it used to delete it
|
||||
addEdits(log, hri, htd, 1, scopes1);
|
||||
log.rollWriter();
|
||||
assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(log));
|
||||
assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(log));
|
||||
|
||||
// See if there's anything wrong with more than 1 edit
|
||||
addEdits(log, hri, htd, 2, scopes1);
|
||||
log.rollWriter();
|
||||
assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log));
|
||||
assertEquals(2, FSHLogProvider.getNumRolledLogFiles(log));
|
||||
|
||||
// Now mix edits from 2 regions, still no flushing
|
||||
addEdits(log, hri, htd, 1, scopes1);
|
||||
|
@ -234,7 +233,7 @@ public class TestDefaultWALProvider {
|
|||
addEdits(log, hri, htd, 1, scopes1);
|
||||
addEdits(log, hri2, htd2, 1, scopes2);
|
||||
log.rollWriter();
|
||||
assertEquals(3, DefaultWALProvider.getNumRolledLogFiles(log));
|
||||
assertEquals(3, AbstractFSWALProvider.getNumRolledLogFiles(log));
|
||||
|
||||
// Flush the first region, we expect to see the first two files getting
|
||||
// archived. We need to append something or writer won't be rolled.
|
||||
|
@ -242,7 +241,7 @@ public class TestDefaultWALProvider {
|
|||
log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys());
|
||||
log.completeCacheFlush(hri.getEncodedNameAsBytes());
|
||||
log.rollWriter();
|
||||
assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log));
|
||||
assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(log));
|
||||
|
||||
// Flush the second region, which removes all the remaining output files
|
||||
// since the oldest was completely flushed and the two others only contain
|
||||
|
@ -251,7 +250,7 @@ public class TestDefaultWALProvider {
|
|||
log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getFamiliesKeys());
|
||||
log.completeCacheFlush(hri2.getEncodedNameAsBytes());
|
||||
log.rollWriter();
|
||||
assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(log));
|
||||
assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(log));
|
||||
} finally {
|
||||
if (wals != null) {
|
||||
wals.close();
|
||||
|
@ -289,11 +288,11 @@ public class TestDefaultWALProvider {
|
|||
scopes2.put(fam, 0);
|
||||
}
|
||||
final Configuration localConf = new Configuration(conf);
|
||||
localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName());
|
||||
localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName());
|
||||
final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName());
|
||||
try {
|
||||
final WAL wal = wals.getWAL(UNSPECIFIED_REGION, null);
|
||||
assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
|
||||
assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal));
|
||||
HRegionInfo hri1 =
|
||||
new HRegionInfo(table1.getTableName(), HConstants.EMPTY_START_ROW,
|
||||
HConstants.EMPTY_END_ROW);
|
||||
|
@ -308,26 +307,26 @@ public class TestDefaultWALProvider {
|
|||
addEdits(wal, hri1, table1, 1, scopes1);
|
||||
wal.rollWriter();
|
||||
// assert that the wal is rolled
|
||||
assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(wal));
|
||||
assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(wal));
|
||||
// add edits in the second wal file, and roll writer.
|
||||
addEdits(wal, hri1, table1, 1, scopes1);
|
||||
wal.rollWriter();
|
||||
// assert that the wal is rolled
|
||||
assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
|
||||
assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal));
|
||||
// add a waledit to table1, and flush the region.
|
||||
addEdits(wal, hri1, table1, 3, scopes1);
|
||||
flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getFamiliesKeys());
|
||||
// roll log; all old logs should be archived.
|
||||
wal.rollWriter();
|
||||
assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
|
||||
assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal));
|
||||
// add an edit to table2, and roll writer
|
||||
addEdits(wal, hri2, table2, 1, scopes2);
|
||||
wal.rollWriter();
|
||||
assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(wal));
|
||||
assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(wal));
|
||||
// add edits for table1, and roll writer
|
||||
addEdits(wal, hri1, table1, 2, scopes1);
|
||||
wal.rollWriter();
|
||||
assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
|
||||
assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal));
|
||||
// add edits for table2, and flush hri1.
|
||||
addEdits(wal, hri2, table2, 2, scopes2);
|
||||
flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getFamiliesKeys());
|
||||
|
@ -337,12 +336,12 @@ public class TestDefaultWALProvider {
|
|||
// log3: region2 (unflushed)
|
||||
// roll the writer; log2 should be archived.
|
||||
wal.rollWriter();
|
||||
assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
|
||||
assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal));
|
||||
// flush region2, and all logs should be archived.
|
||||
addEdits(wal, hri2, table2, 2, scopes2);
|
||||
flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getFamiliesKeys());
|
||||
wal.rollWriter();
|
||||
assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
|
||||
assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal));
|
||||
} finally {
|
||||
if (wals != null) {
|
||||
wals.close();
|
||||
|
@ -370,7 +369,7 @@ public class TestDefaultWALProvider {
|
|||
@Test
|
||||
public void setMembershipDedups() throws IOException {
|
||||
final Configuration localConf = new Configuration(conf);
|
||||
localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName());
|
||||
localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName());
|
||||
final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName());
|
||||
try {
|
||||
final Set<WAL> seen = new HashSet<WAL>(1);
|
|
@ -141,7 +141,7 @@ public class TestSecureWAL {
|
|||
System.currentTimeMillis(), scopes), kvs, true);
|
||||
}
|
||||
wal.sync();
|
||||
final Path walPath = DefaultWALProvider.getCurrentFileName(wal);
|
||||
final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
|
||||
wals.shutdown();
|
||||
|
||||
// Insure edits are not plaintext
|
||||
|
|
|
@ -138,6 +138,8 @@ public class TestWALFactory {
|
|||
"dfs.client.block.recovery.retries", 1);
|
||||
TEST_UTIL.getConfiguration().setInt(
|
||||
"hbase.ipc.client.connection.maxidletime", 500);
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000);
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000);
|
||||
TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
|
||||
SampleRegionWALObserver.class.getName());
|
||||
TEST_UTIL.startMiniDFSCluster(3);
|
||||
|
@ -169,7 +171,7 @@ public class TestWALFactory {
|
|||
final byte [] rowName = tableName.getName();
|
||||
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
|
||||
final Path logdir = new Path(hbaseDir,
|
||||
DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName()));
|
||||
AbstractFSWALProvider.getWALDirectoryName(currentTest.getMethodName()));
|
||||
Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
final int howmany = 3;
|
||||
HRegionInfo[] infos = new HRegionInfo[3];
|
||||
|
@ -273,7 +275,7 @@ public class TestWALFactory {
|
|||
// gives you EOFE.
|
||||
wal.sync();
|
||||
// Open a Reader.
|
||||
Path walPath = DefaultWALProvider.getCurrentFileName(wal);
|
||||
Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
|
||||
reader = wals.createReader(fs, walPath);
|
||||
int count = 0;
|
||||
WAL.Entry entry = new WAL.Entry();
|
||||
|
@ -397,7 +399,7 @@ public class TestWALFactory {
|
|||
// Now call sync to send the data to HDFS datanodes
|
||||
wal.sync();
|
||||
int namenodePort = cluster.getNameNodePort();
|
||||
final Path walPath = DefaultWALProvider.getCurrentFileName(wal);
|
||||
final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
|
||||
|
||||
|
||||
// Stop the cluster. (ensure restart since we're sharing MiniDFSCluster)
|
||||
|
@ -533,7 +535,7 @@ public class TestWALFactory {
|
|||
log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getFamiliesKeys());
|
||||
log.completeCacheFlush(info.getEncodedNameAsBytes());
|
||||
log.shutdown();
|
||||
Path filename = DefaultWALProvider.getCurrentFileName(log);
|
||||
Path filename = AbstractFSWALProvider.getCurrentFileName(log);
|
||||
// Now open a reader on the log and assert append worked.
|
||||
reader = wals.createReader(fs, filename);
|
||||
// Above we added all columns on a single row so we only read one
|
||||
|
@ -596,7 +598,7 @@ public class TestWALFactory {
|
|||
log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys());
|
||||
log.completeCacheFlush(hri.getEncodedNameAsBytes());
|
||||
log.shutdown();
|
||||
Path filename = DefaultWALProvider.getCurrentFileName(log);
|
||||
Path filename = AbstractFSWALProvider.getCurrentFileName(log);
|
||||
// Now open a reader on the log and assert append worked.
|
||||
reader = wals.createReader(fs, filename);
|
||||
WAL.Entry entry = reader.next();
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
|||
// imports for things that haven't moved from regionserver.wal yet.
|
||||
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.SecureAsyncProtobufLogWriter;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.SecureWALCellCodec;
|
||||
|
@ -120,7 +121,7 @@ public class TestWALReaderOnSecureWAL {
|
|||
System.currentTimeMillis(), mvcc, scopes), kvs, true);
|
||||
}
|
||||
wal.sync();
|
||||
final Path walPath = DefaultWALProvider.getCurrentFileName(wal);
|
||||
final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
|
||||
wal.shutdown();
|
||||
|
||||
return walPath;
|
||||
|
@ -129,7 +130,7 @@ public class TestWALReaderOnSecureWAL {
|
|||
conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, clsName);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test()
|
||||
public void testWALReaderOnSecureWAL() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
|
@ -137,6 +138,8 @@ public class TestWALReaderOnSecureWAL {
|
|||
WAL.Reader.class);
|
||||
conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
|
||||
WALProvider.Writer.class);
|
||||
conf.setClass("hbase.regionserver.hlog.async.writer.impl", SecureAsyncProtobufLogWriter.class,
|
||||
WALProvider.AsyncWriter.class);
|
||||
conf.setBoolean(WAL_ENCRYPTION, true);
|
||||
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||
final WALFactory wals = new WALFactory(conf, null, currentTest.getMethodName());
|
||||
|
@ -157,7 +160,7 @@ public class TestWALReaderOnSecureWAL {
|
|||
} catch (IOException ioe) {
|
||||
// expected IOE
|
||||
}
|
||||
|
||||
|
||||
FileStatus[] listStatus = fs.listStatus(walPath.getParent());
|
||||
RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
|
||||
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
|
||||
|
@ -174,7 +177,7 @@ public class TestWALReaderOnSecureWAL {
|
|||
}
|
||||
wals.close();
|
||||
}
|
||||
|
||||
|
||||
@Test()
|
||||
public void testSecureWALReaderOnWAL() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
|
@ -202,7 +205,7 @@ public class TestWALReaderOnSecureWAL {
|
|||
} catch (IOException ioe) {
|
||||
assertFalse(true);
|
||||
}
|
||||
|
||||
|
||||
FileStatus[] listStatus = fs.listStatus(walPath.getParent());
|
||||
RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
|
||||
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
|
||||
|
|
|
@ -23,6 +23,10 @@ import static org.junit.Assert.assertFalse;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Method;
|
||||
|
@ -39,7 +43,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -66,13 +69,11 @@ import org.apache.hadoop.hbase.regionserver.wal.FaultySequenceFileLogReader;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
@ -95,9 +96,6 @@ import org.mockito.Mockito;
|
|||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
/**
|
||||
* Testing {@link WAL} splitting code.
|
||||
*/
|
||||
|
@ -191,7 +189,7 @@ public class TestWALSplit {
|
|||
this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
|
||||
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
|
||||
wals = new WALFactory(conf, null, name.getMethodName());
|
||||
WALDIR = new Path(HBASEDIR, DefaultWALProvider.getWALDirectoryName(name.getMethodName()));
|
||||
WALDIR = new Path(HBASEDIR, AbstractFSWALProvider.getWALDirectoryName(name.getMethodName()));
|
||||
//fs.mkdirs(WALDIR);
|
||||
}
|
||||
|
||||
|
|
|
@ -359,7 +359,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
|
|||
if (verify) {
|
||||
LOG.info("verifying written log entries.");
|
||||
Path dir = new Path(FSUtils.getRootDir(getConf()),
|
||||
DefaultWALProvider.getWALDirectoryName("wals"));
|
||||
AbstractFSWALProvider.getWALDirectoryName("wals"));
|
||||
long editCount = 0;
|
||||
FileStatus [] fsss = fs.listStatus(dir);
|
||||
if (fsss.length == 0) throw new IllegalStateException("No WAL found");
|
||||
|
@ -513,7 +513,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
|
|||
// We used to do explicit call to rollWriter but changed it to a request
|
||||
// to avoid dead lock (there are less threads going on in this class than
|
||||
// in the regionserver -- regionserver does not have the issue).
|
||||
DefaultWALProvider.requestLogRoll(wal);
|
||||
AbstractFSWALProvider.requestLogRoll(wal);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue