HBASE-24408 Introduce a general 'local region' to store data on master (#1753)

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Duo Zhang 2020-05-23 15:59:51 +08:00
parent 950237566b
commit 9f69ad4746
38 changed files with 1703 additions and 781 deletions

View File

@ -125,7 +125,7 @@ possible configurations would overwhelm and obscure the important.
</property>
<property>
<name>hbase.master.logcleaner.plugins</name>
<value>org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner,org.apache.hadoop.hbase.master.cleaner.TimeToLiveProcedureWALCleaner</value>
<value>org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner,org.apache.hadoop.hbase.master.cleaner.TimeToLiveProcedureWALCleaner,org.apache.hadoop.hbase.master.cleaner.TimeToLiveMasterLocalStoreWALCleaner</value>
<description>A comma-separated list of BaseLogCleanerDelegate invoked by
the LogsCleaner service. These WAL cleaners are called in order,
so put the cleaner that prunes the most files in front. To
@ -139,16 +139,9 @@ possible configurations would overwhelm and obscure the important.
<description>How long a WAL remain in the archive ({hbase.rootdir}/oldWALs) directory,
after which it will be cleaned by a Master thread. The value is in milliseconds.</description>
</property>
<property>
<name>hbase.master.procedurewalcleaner.ttl</name>
<value>604800000</value>
<description>How long a Procedure WAL will remain in the
archive directory, after which it will be cleaned
by a Master thread. The value is in milliseconds.</description>
</property>
<property>
<name>hbase.master.hfilecleaner.plugins</name>
<value>org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner</value>
<value>org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner,org.apache.hadoop.hbase.master.cleaner.TimeToLiveMasterLocalStoreHFileCleaner</value>
<description>A comma-separated list of BaseHFileCleanerDelegate invoked by
the HFileCleaner service. These HFiles cleaners are called in order,
so put the cleaner that prunes the most files in front. To
@ -157,17 +150,6 @@ possible configurations would overwhelm and obscure the important.
default hfile cleaners in the list as they will be overwritten in
hbase-site.xml.</description>
</property>
<property>
<name>hbase.procedure.store.region.hfilecleaner.plugins</name>
<value>org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner</value>
<description>A comma-separated list of BaseHFileCleanerDelegate invoked by
the RegionProcedureStore HFileCleaner service. These HFiles cleaners are
called in order, so put the cleaner that prunes the most files in front. To
implement your own BaseHFileCleanerDelegate, just put it in HBase's classpath
and add the fully qualified class name here. Always add the above
default hfile cleaners in the list as they will be overwritten in
hbase-site.xml.</description>
</property>
<property>
<name>hbase.master.infoserver.redirect</name>
<value>true</value>

View File

@ -186,7 +186,12 @@ public interface ProcedureStore {
/**
* Acquire the lease for the procedure store.
* @deprecated since 2.3.0, will be removed in 4.0.0 along with
* {@link org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore}. As now we
* will store the procedure data in a master local region, and master itself will deal
* with the lease recovery of the region.
*/
@Deprecated
void recoverLease() throws IOException;
/**

View File

@ -521,12 +521,13 @@ public class FileLink {
/**
* Checks if the specified directory path is a back reference links folder.
*
* @param dirPath Directory path to verify
* @return True if the specified directory is a link references folder
*/
public static boolean isBackReferencesDir(final Path dirPath) {
if (dirPath == null) return false;
if (dirPath == null) {
return false;
}
return dirPath.getName().startsWith(BACK_REFERENCES_DIRECTORY_PREFIX);
}

View File

@ -209,7 +209,9 @@ public class HFileLink extends FileLink {
*/
public static boolean isHFileLink(String fileName) {
Matcher m = LINK_NAME_PATTERN.matcher(fileName);
if (!m.matches()) return false;
if (!m.matches()) {
return false;
}
return m.groupCount() > 2 && m.group(4) != null && m.group(3) != null && m.group(2) != null;
}

View File

@ -145,6 +145,7 @@ import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
import org.apache.hadoop.hbase.mob.MobConstants;
@ -434,6 +435,9 @@ public class HMaster extends HRegionServer implements MasterServices {
private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
private ProcedureStore procedureStore;
// the master local storage to store procedure data, etc.
private LocalStore localStore;
// handle table states
private TableStateManager tableStateManager;
@ -887,7 +891,8 @@ public class HMaster extends HRegionServer implements MasterServices {
this.masterActiveTime = System.currentTimeMillis();
// TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
// always initialize the MemStoreLAB as we use a region to store procedure now.
// always initialize the MemStoreLAB as we use a region to store data in master now, see
// localStore.
initializeMemStoreChunkCreator();
this.fileSystemManager = new MasterFileSystem(conf);
this.walManager = new MasterWalManager(this);
@ -930,6 +935,9 @@ public class HMaster extends HRegionServer implements MasterServices {
DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
this.splitWALManager = new SplitWALManager(this);
}
// initialize local store
localStore = LocalStore.create(this);
createProcedureExecutor();
Map<Class<?>, List<Procedure<MasterProcedureEnv>>> procsByType =
procedureExecutor.getActiveProceduresNoCopy().stream()
@ -1412,6 +1420,8 @@ public class HMaster extends HRegionServer implements MasterServices {
this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
startProcedureExecutor();
// Create cleaner thread pool
cleanerPool = new DirScanPool(conf);
// Start log cleaner thread
int cleanerInterval =
conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
@ -1501,6 +1511,9 @@ public class HMaster extends HRegionServer implements MasterServices {
stopProcedureExecutor();
if (localStore != null) {
localStore.close(isAborted());
}
if (this.walManager != null) {
this.walManager.stop();
}
@ -1517,10 +1530,8 @@ public class HMaster extends HRegionServer implements MasterServices {
private void createProcedureExecutor() throws IOException {
MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
// Create cleaner thread pool
cleanerPool = new DirScanPool(conf);
procedureStore = new RegionProcedureStore(this, cleanerPool,
new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
procedureStore =
new RegionProcedureStore(this, localStore, new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
procedureStore.registerListener(new ProcedureStoreListener() {
@Override

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base class for time to live file cleaner.
*/
@InterfaceAudience.Private
public abstract class BaseTimeToLiveFileCleaner extends BaseLogCleanerDelegate {
private static final Logger LOG =
LoggerFactory.getLogger(BaseTimeToLiveFileCleaner.class.getName());
private static final DateTimeFormatter FORMATTER =
DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneOffset.systemDefault());
// Configured time a log can be kept after it was closed
private long ttlMs;
private volatile boolean stopped = false;
@Override
public final void setConf(Configuration conf) {
super.setConf(conf);
this.ttlMs = getTtlMs(conf);
}
@Override
public boolean isFileDeletable(FileStatus status) {
// Files are validated for the second time here,
// if it causes a bottleneck this logic needs refactored
if (!valiateFilename(status.getPath())) {
return true;
}
long currentTime = EnvironmentEdgeManager.currentTime();
long time = status.getModificationTime();
long life = currentTime - time;
if (LOG.isTraceEnabled()) {
LOG.trace("File life:{}ms, ttl:{}ms, current:{}, from{}", life, ttlMs,
FORMATTER.format(Instant.ofEpochMilli(currentTime)),
FORMATTER.format(Instant.ofEpochMilli(time)));
}
if (life < 0) {
LOG.warn("Found a file ({}) newer than current time ({} < {}), probably a clock skew",
status.getPath(), FORMATTER.format(Instant.ofEpochMilli(currentTime)),
FORMATTER.format(Instant.ofEpochMilli(time)));
return false;
}
return life > ttlMs;
}
@Override
public void stop(String why) {
this.stopped = true;
}
@Override
public boolean isStopped() {
return this.stopped;
}
protected abstract long getTtlMs(Configuration conf);
protected abstract boolean valiateFilename(Path file);
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.StealJobQueue;
import org.apache.yetus.audience.InterfaceAudience;
@ -158,10 +159,9 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
@Override
protected boolean validate(Path file) {
if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) {
return true;
}
return StoreFileInfo.validateStoreFileName(file.getName());
return HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent()) ||
StoreFileInfo.validateStoreFileName(file.getName()) ||
file.getName().endsWith(LocalStore.ARCHIVED_HFILE_SUFFIX);
}
/**

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -86,8 +87,9 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
@Override
protected boolean validate(Path file) {
return AbstractFSWALProvider.validateWALFilename(file.getName())
|| MasterProcedureUtil.validateProcedureWALFilename(file.getName());
return AbstractFSWALProvider.validateWALFilename(file.getName()) ||
MasterProcedureUtil.validateProcedureWALFilename(file.getName()) ||
file.getName().endsWith(LocalStore.ARCHIVED_WAL_SUFFIX);
}
@Override

View File

@ -17,48 +17,33 @@
*/
package org.apache.hadoop.hbase.master.cleaner;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.yetus.audience.InterfaceAudience;
/**
* HFile cleaner that uses the timestamp of the hfile to determine if it should be deleted. By
* default they are allowed to live for {@value #DEFAULT_TTL}
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class TimeToLiveHFileCleaner extends BaseHFileCleanerDelegate {
public class TimeToLiveHFileCleaner extends BaseTimeToLiveFileCleaner {
private static final Logger LOG = LoggerFactory.getLogger(TimeToLiveHFileCleaner.class.getName());
public static final String TTL_CONF_KEY = "hbase.master.hfilecleaner.ttl";
// default ttl = 5 minutes
public static final long DEFAULT_TTL = 60000 * 5;
// Configured time a hfile can be kept after it was moved to the archive
private long ttl;
@Override
public void setConf(Configuration conf) {
this.ttl = conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
super.setConf(conf);
protected long getTtlMs(Configuration conf) {
return conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
}
@Override
public boolean isFileDeletable(FileStatus fStat) {
long currentTime = EnvironmentEdgeManager.currentTime();
long time = fStat.getModificationTime();
long life = currentTime - time;
if (LOG.isTraceEnabled()) {
LOG.trace("HFile life:" + life + ", ttl:" + ttl + ", current:" + currentTime + ", from: "
+ time);
}
if (life < 0) {
LOG.warn("Found a hfile (" + fStat.getPath() + ") newer than current time (" + currentTime
+ " < " + time + "), probably a clock skew");
return false;
}
return life > ttl;
protected boolean valiateFilename(Path file) {
return HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent()) ||
StoreFileInfo.validateStoreFileName(file.getName());
}
}

View File

@ -17,66 +17,31 @@
*/
package org.apache.hadoop.hbase.master.cleaner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* Log cleaner that uses the timestamp of the wal to determine if it should
* be deleted. By default they are allowed to live for 10 minutes.
* Log cleaner that uses the timestamp of the wal to determine if it should be deleted. By default
* they are allowed to live for 10 minutes.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class TimeToLiveLogCleaner extends BaseLogCleanerDelegate {
private static final Logger LOG = LoggerFactory.getLogger(TimeToLiveLogCleaner.class.getName());
public class TimeToLiveLogCleaner extends BaseTimeToLiveFileCleaner {
public static final String TTL_CONF_KEY = "hbase.master.logcleaner.ttl";
// default ttl = 10 minutes
public static final long DEFAULT_TTL = 600_000L;
// Configured time a log can be kept after it was closed
private long ttl;
private boolean stopped = false;
@Override
public boolean isFileDeletable(FileStatus fStat) {
// Files are validated for the second time here,
// if it causes a bottleneck this logic needs refactored
if (!AbstractFSWALProvider.validateWALFilename(fStat.getPath().getName())) {
return true;
}
long currentTime = EnvironmentEdgeManager.currentTime();
long time = fStat.getModificationTime();
long life = currentTime - time;
if (LOG.isTraceEnabled()) {
LOG.trace("Log life:" + life + ", ttl:" + ttl + ", current:" + currentTime + ", from: "
+ time);
}
if (life < 0) {
LOG.warn("Found a log (" + fStat.getPath() + ") newer than current time (" + currentTime
+ " < " + time + "), probably a clock skew");
return false;
}
return life > ttl;
protected long getTtlMs(Configuration conf) {
return conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
}
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
this.ttl = conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
}
@Override
public void stop(String why) {
this.stopped = true;
}
@Override
public boolean isStopped() {
return this.stopped;
protected boolean valiateFilename(Path file) {
return AbstractFSWALProvider.validateWALFilename(file.getName());
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Master local storage HFile cleaner that uses the timestamp of the HFile to determine if it should
* be deleted. By default they are allowed to live for {@value #DEFAULT_TTL}
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class TimeToLiveMasterLocalStoreHFileCleaner extends BaseTimeToLiveFileCleaner {
public static final String TTL_CONF_KEY = "hbase.master.local.store.hfilecleaner.ttl";
// default ttl = 7 days
public static final long DEFAULT_TTL = 604_800_000L;
@Override
protected long getTtlMs(Configuration conf) {
return conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
}
@Override
protected boolean valiateFilename(Path file) {
return file.getName().endsWith(LocalStore.ARCHIVED_HFILE_SUFFIX);
}
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Master local storage WAL cleaner that uses the timestamp of the WAL file to determine if it
* should be deleted. By default they are allowed to live for {@value #DEFAULT_TTL}
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class TimeToLiveMasterLocalStoreWALCleaner extends BaseTimeToLiveFileCleaner {
public static final String TTL_CONF_KEY = "hbase.master.local.store.walcleaner.ttl";
// default ttl = 7 days
public static final long DEFAULT_TTL = 604_800_000L;
@Override
protected long getTtlMs(Configuration conf) {
return conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
}
@Override
protected boolean valiateFilename(Path file) {
return file.getName().endsWith(LocalStore.ARCHIVED_WAL_SUFFIX);
}
}

View File

@ -18,65 +18,33 @@
package org.apache.hadoop.hbase.master.cleaner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Procedure WAL cleaner that uses the timestamp of the Procedure WAL to determine if it should be
* deleted. By default they are allowed to live for {@value #DEFAULT_TTL}
* deleted. By default they are allowed to live for {@value #DEFAULT_TTL}.
* @deprecated Since 2.3.0, will be removed in 4.0.0. We will not use the procedure wal to store
* procedure any more.
*/
@Deprecated
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class TimeToLiveProcedureWALCleaner extends BaseLogCleanerDelegate {
private static final Logger LOG =
LoggerFactory.getLogger(TimeToLiveProcedureWALCleaner.class.getName());
public class TimeToLiveProcedureWALCleaner extends BaseTimeToLiveFileCleaner {
public static final String TTL_CONF_KEY = "hbase.master.procedurewalcleaner.ttl";
// default ttl = 7 days
public static final long DEFAULT_TTL = 604_800_000L;
// Configured time a procedure log can be kept after it was moved to the archive
private long ttl;
private boolean stopped = false;
@Override
public void setConf(Configuration conf) {
this.ttl = conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
super.setConf(conf);
protected long getTtlMs(Configuration conf) {
return conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
}
@Override
public boolean isFileDeletable(FileStatus fStat) {
// Files are validated for the second time here,
// if it causes a bottleneck this logic needs refactored
if (!MasterProcedureUtil.validateProcedureWALFilename(fStat.getPath().getName())) {
return true;
}
long currentTime = EnvironmentEdgeManager.currentTime();
long time = fStat.getModificationTime();
long life = currentTime - time;
if (LOG.isTraceEnabled()) {
LOG.trace("Procedure log life:" + life + ", ttl:" + ttl + ", current:" + currentTime +
", from: " + time);
}
if (life < 0) {
LOG.warn("Found a procedure log (" + fStat.getPath() + ") newer than current time ("
+ currentTime + " < " + time + "), probably a clock skew");
return false;
}
return life > ttl;
}
@Override
public void stop(String why) {
this.stopped = true;
}
@Override
public boolean isStopped() {
return this.stopped;
protected boolean valiateFilename(Path file) {
return MasterProcedureUtil.validateProcedureWALFilename(file.getName());
}
}

View File

@ -152,16 +152,13 @@ public final class MasterProcedureUtil {
@Deprecated
private static final Pattern PATTERN = Pattern.compile(".*pv2-\\d{20}.log");
// Use the character $ to let the log cleaner know that this is not the normal wal file.
public static final String ARCHIVED_PROC_WAL_SUFFIX = "$masterproc$";
/**
* A Procedure WAL file name is of the format: pv-&lt;wal-id&gt;.log where wal-id is 20 digits.
* @param filename name of the file to validate
* @return <tt>true</tt> if the filename matches a Procedure WAL, <tt>false</tt> otherwise
*/
public static boolean validateProcedureWALFilename(String filename) {
return PATTERN.matcher(filename).matches() || filename.endsWith(ARCHIVED_PROC_WAL_SUFFIX);
return PATTERN.matcher(filename).matches();
}
/**

View File

@ -0,0 +1,325 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.store;
import static org.apache.hadoop.hbase.HConstants.HREGION_LOGDIR_NAME;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.math.IntMath;
/**
* A region that stores data in a separated directory.
* <p/>
* FileSystem layout:
*
* <pre>
* hbase
* |
* --&lt;region dir&gt;
* |
* --data
* | |
* | --/&lt;ns&gt/&lt;table&gt/&lt;encoded-region-name&gt; <---- The region data
* | |
* | --replay <---- The edits to replay
* |
* --WALs
* |
* --&lt;master-server-name&gt; <---- The WAL dir for active master
* |
* --&lt;master-server-name&gt;-dead <---- The WAL dir for dead master
* </pre>
*
* Notice that, you can use different root file system and WAL file system. Then the above directory
* will be on two file systems, the root file system will have the data directory while the WAL
* filesystem will have the WALs directory. The archived HFile will be moved to the global HFile
* archived directory with the {@link LocalRegionParams#archivedWalSuffix()} suffix. The archived
* WAL will be moved to the global WAL archived directory with the
* {@link LocalRegionParams#archivedHFileSuffix()} suffix.
*/
@InterfaceAudience.Private
public final class LocalRegion {
private static final Logger LOG = LoggerFactory.getLogger(LocalRegion.class);
private static final String REPLAY_EDITS_DIR = "recovered.wals";
private static final String DEAD_WAL_DIR_SUFFIX = "-dead";
private static final int REGION_ID = 1;
private final WALFactory walFactory;
@VisibleForTesting
final HRegion region;
@VisibleForTesting
final LocalRegionFlusherAndCompactor flusherAndCompactor;
private LocalRegionWALRoller walRoller;
private LocalRegion(HRegion region, WALFactory walFactory,
LocalRegionFlusherAndCompactor flusherAndCompactor, LocalRegionWALRoller walRoller) {
this.region = region;
this.walFactory = walFactory;
this.flusherAndCompactor = flusherAndCompactor;
this.walRoller = walRoller;
}
private void closeRegion(boolean abort) {
try {
region.close(abort);
} catch (IOException e) {
LOG.warn("Failed to close region", e);
}
}
private void shutdownWAL() {
try {
walFactory.shutdown();
} catch (IOException e) {
LOG.warn("Failed to shutdown WAL", e);
}
}
public void update(UpdateLocalRegion action) throws IOException {
action.update(region);
flusherAndCompactor.onUpdate();
}
public Result get(Get get) throws IOException {
return region.get(get);
}
public RegionScanner getScanner(Scan scan) throws IOException {
return region.getScanner(scan);
}
@VisibleForTesting
FlushResult flush(boolean force) throws IOException {
return region.flush(force);
}
@VisibleForTesting
void requestRollAll() {
walRoller.requestRollAll();
}
@VisibleForTesting
void waitUntilWalRollFinished() throws InterruptedException {
walRoller.waitUntilWalRollFinished();
}
public void close(boolean abort) {
LOG.info("Closing local region {}, isAbort={}", region.getRegionInfo(), abort);
if (flusherAndCompactor != null) {
flusherAndCompactor.close();
}
// if abort, we shutdown wal first to fail the ongoing updates to the region, and then close the
// region, otherwise there will be dead lock.
if (abort) {
shutdownWAL();
closeRegion(true);
} else {
closeRegion(false);
shutdownWAL();
}
if (walRoller != null) {
walRoller.close();
}
}
private static WAL createWAL(WALFactory walFactory, LocalRegionWALRoller walRoller,
String serverName, FileSystem walFs, Path walRootDir, RegionInfo regionInfo)
throws IOException {
String logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
Path walDir = new Path(walRootDir, logName);
LOG.debug("WALDir={}", walDir);
if (walFs.exists(walDir)) {
throw new HBaseIOException(
"Already created wal directory at " + walDir + " for local region " + regionInfo);
}
if (!walFs.mkdirs(walDir)) {
throw new IOException(
"Can not create wal directory " + walDir + " for local region " + regionInfo);
}
WAL wal = walFactory.getWAL(regionInfo);
walRoller.addWAL(wal);
return wal;
}
private static HRegion bootstrap(Configuration conf, TableDescriptor td, FileSystem fs,
Path rootDir, FileSystem walFs, Path walRootDir, WALFactory walFactory,
LocalRegionWALRoller walRoller, String serverName) throws IOException {
TableName tn = td.getTableName();
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setRegionId(REGION_ID).build();
Path tmpTableDir = CommonFSUtils.getTableDir(rootDir,
TableName.valueOf(tn.getNamespaceAsString(), tn.getQualifierAsString() + "-tmp"));
if (fs.exists(tmpTableDir) && !fs.delete(tmpTableDir, true)) {
throw new IOException("Can not delete partial created proc region " + tmpTableDir);
}
HRegion.createHRegion(conf, regionInfo, fs, tmpTableDir, td).close();
Path tableDir = CommonFSUtils.getTableDir(rootDir, tn);
if (!fs.rename(tmpTableDir, tableDir)) {
throw new IOException("Can not rename " + tmpTableDir + " to " + tableDir);
}
WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo);
return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
}
private static HRegion open(Configuration conf, TableDescriptor td, FileSystem fs, Path rootDir,
FileSystem walFs, Path walRootDir, WALFactory walFactory, LocalRegionWALRoller walRoller,
String serverName) throws IOException {
Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
Path regionDir =
fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
.getPath();
RegionInfo regionInfo = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
Path walRegionDir = FSUtils.getRegionDirFromRootDir(walRootDir, regionInfo);
Path replayEditsDir = new Path(walRegionDir, REPLAY_EDITS_DIR);
if (!walFs.exists(replayEditsDir) && !walFs.mkdirs(replayEditsDir)) {
throw new IOException("Failed to create replay directory: " + replayEditsDir);
}
Path walsDir = new Path(walRootDir, HREGION_LOGDIR_NAME);
for (FileStatus walDir : walFs.listStatus(walsDir)) {
if (!walDir.isDirectory()) {
continue;
}
if (walDir.getPath().getName().startsWith(serverName)) {
LOG.warn("This should not happen in real production as we have not created our WAL " +
"directory yet, ignore if you are running a local region related UT");
}
Path deadWALDir;
if (!walDir.getPath().getName().endsWith(DEAD_WAL_DIR_SUFFIX)) {
deadWALDir =
new Path(walDir.getPath().getParent(), walDir.getPath().getName() + DEAD_WAL_DIR_SUFFIX);
if (!walFs.rename(walDir.getPath(), deadWALDir)) {
throw new IOException("Can not rename " + walDir + " to " + deadWALDir +
" when recovering lease of proc store");
}
LOG.info("Renamed {} to {} as it is dead", walDir.getPath(), deadWALDir);
} else {
deadWALDir = walDir.getPath();
LOG.info("{} is already marked as dead", deadWALDir);
}
for (FileStatus walFile : walFs.listStatus(deadWALDir)) {
Path replayEditsFile = new Path(replayEditsDir, walFile.getPath().getName());
RecoverLeaseFSUtils.recoverFileLease(walFs, walFile.getPath(), conf);
if (!walFs.rename(walFile.getPath(), replayEditsFile)) {
throw new IOException("Can not rename " + walFile.getPath() + " to " + replayEditsFile +
" when recovering lease for local region");
}
LOG.info("Renamed {} to {}", walFile.getPath(), replayEditsFile);
}
LOG.info("Delete empty local region wal dir {}", deadWALDir);
walFs.delete(deadWALDir, true);
}
WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo);
conf.set(HRegion.SPECIAL_RECOVERED_EDITS_DIR,
replayEditsDir.makeQualified(walFs.getUri(), walFs.getWorkingDirectory()).toString());
return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
}
public static LocalRegion create(LocalRegionParams params) throws IOException {
TableDescriptor td = params.tableDescriptor();
LOG.info("Create or load local region for table " + td);
Server server = params.server();
Configuration baseConf = server.getConfiguration();
FileSystem fs = CommonFSUtils.getRootDirFileSystem(baseConf);
FileSystem walFs = CommonFSUtils.getWALFileSystem(baseConf);
Path globalRootDir = CommonFSUtils.getRootDir(baseConf);
Path globalWALRootDir = CommonFSUtils.getWALRootDir(baseConf);
Path rootDir = new Path(globalRootDir, params.regionDirName());
Path walRootDir = new Path(globalWALRootDir, params.regionDirName());
// we will override some configurations so create a new one.
Configuration conf = new Configuration(baseConf);
CommonFSUtils.setRootDir(conf, rootDir);
CommonFSUtils.setWALRootDir(conf, walRootDir);
LocalRegionFlusherAndCompactor.setupConf(conf, params.flushSize(), params.flushPerChanges(),
params.flushIntervalMs());
conf.setInt(AbstractFSWAL.MAX_LOGS, params.maxWals());
if (params.useHsync() != null) {
conf.setBoolean(HRegion.WAL_HSYNC_CONF_KEY, params.useHsync());
}
conf.setInt(AbstractFSWAL.RING_BUFFER_SLOT_COUNT,
IntMath.ceilingPowerOfTwo(params.ringBufferSlotCount()));
LocalRegionWALRoller walRoller = LocalRegionWALRoller.create(td.getTableName() + "-WAL-Roller",
conf, server, walFs, walRootDir, globalWALRootDir, params.archivedWalSuffix(),
params.rollPeriodMs(), params.flushSize());
walRoller.start();
WALFactory walFactory = new WALFactory(conf, server.getServerName().toString());
Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
HRegion region;
if (fs.exists(tableDir)) {
// load the existing region.
region = open(conf, td, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
server.getServerName().toString());
} else {
// bootstrapping...
region = bootstrap(conf, td, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
server.getServerName().toString());
}
Path globalArchiveDir = HFileArchiveUtil.getArchivePath(baseConf);
LocalRegionFlusherAndCompactor flusherAndCompactor = new LocalRegionFlusherAndCompactor(conf,
server, region, params.flushSize(), params.flushPerChanges(), params.flushIntervalMs(),
params.compactMin(), globalArchiveDir, params.archivedHFileSuffix());
walRoller.setFlusherAndCompactor(flusherAndCompactor);
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
if (!fs.mkdirs(archiveDir)) {
LOG.warn("Failed to create archive directory {}. Usually this should not happen but it will" +
" be created again when we actually archive the hfiles later, so continue", archiveDir);
}
return new LocalRegion(region, walFactory, flusherAndCompactor, walRoller);
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.region;
package org.apache.hadoop.hbase.master.store;
import java.io.Closeable;
import java.io.IOException;
@ -27,22 +27,26 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* As long as there is no RegionServerServices for the procedure store region, we need implement the
* flush and compaction logic by our own.
* As long as there is no RegionServerServices for a 'local' region, we need implement the flush and
* compaction logic by our own.
* <p/>
* The flush logic is very simple, every time after calling a modification method in
* {@link RegionProcedureStore}, we will call the {@link #onUpdate()} method below, and in this
@ -53,26 +57,11 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
* count, if it is above the compactMin, we will do a major compaction.
*/
@InterfaceAudience.Private
class RegionFlusherAndCompactor implements Closeable {
class LocalRegionFlusherAndCompactor implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(RegionFlusherAndCompactor.class);
private static final Logger LOG = LoggerFactory.getLogger(LocalRegionFlusherAndCompactor.class);
static final String FLUSH_SIZE_KEY = "hbase.procedure.store.region.flush.size";
static final long DEFAULT_FLUSH_SIZE = TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE;
static final String FLUSH_PER_CHANGES_KEY = "hbase.procedure.store.region.flush.per.changes";
private static final long DEFAULT_FLUSH_PER_CHANGES = 1_000_000;
static final String FLUSH_INTERVAL_MS_KEY = "hbase.procedure.store.region.flush.interval.ms";
// default to flush every 15 minutes, for safety
private static final long DEFAULT_FLUSH_INTERVAL_MS = TimeUnit.MINUTES.toMillis(15);
static final String COMPACT_MIN_KEY = "hbase.procedure.store.region.compact.min";
private static final int DEFAULT_COMPACT_MIN = 4;
private final Configuration conf;
private final Abortable abortable;
@ -90,6 +79,10 @@ class RegionFlusherAndCompactor implements Closeable {
private final int compactMin;
private final Path globalArchivePath;
private final String archivedHFileSuffix;
private final Thread flushThread;
private final Lock flushLock = new ReentrantLock();
@ -108,38 +101,60 @@ class RegionFlusherAndCompactor implements Closeable {
private volatile boolean closed = false;
RegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region) {
LocalRegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region,
long flushSize, long flushPerChanges, long flushIntervalMs, int compactMin,
Path globalArchivePath, String archivedHFileSuffix) {
this.conf = conf;
this.abortable = abortable;
this.region = region;
flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
flushPerChanges = conf.getLong(FLUSH_PER_CHANGES_KEY, DEFAULT_FLUSH_PER_CHANGES);
flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
compactMin = conf.getInt(COMPACT_MIN_KEY, DEFAULT_COMPACT_MIN);
flushThread = new Thread(this::flushLoop, "Procedure-Region-Store-Flusher");
this.flushSize = flushSize;
this.flushPerChanges = flushPerChanges;
this.flushIntervalMs = flushIntervalMs;
this.compactMin = compactMin;
this.globalArchivePath = globalArchivePath;
this.archivedHFileSuffix = archivedHFileSuffix;
flushThread = new Thread(this::flushLoop, region.getRegionInfo().getTable() + "-Flusher");
flushThread.setDaemon(true);
flushThread.start();
compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("Procedure-Region-Store-Compactor").setDaemon(true).build());
.setNameFormat(region.getRegionInfo().getTable() + "-Store-Compactor").setDaemon(true)
.build());
LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, compactMin={}",
flushSize, flushPerChanges, flushIntervalMs, compactMin);
}
// inject our flush related configurations
static void setupConf(Configuration conf) {
long flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
static void setupConf(Configuration conf, long flushSize, long flushPerChanges,
long flushIntervalMs) {
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSize);
long flushPerChanges = conf.getLong(FLUSH_PER_CHANGES_KEY, DEFAULT_FLUSH_PER_CHANGES);
conf.setLong(HRegion.MEMSTORE_FLUSH_PER_CHANGES, flushPerChanges);
long flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, flushIntervalMs);
LOG.info("Injected flushSize={}, flushPerChanges={}, flushIntervalMs={}", flushSize,
flushPerChanges, flushIntervalMs);
}
private void moveHFileToGlobalArchiveDir() throws IOException {
FileSystem fs = region.getRegionFileSystem().getFileSystem();
for (HStore store : region.getStores()) {
store.closeAndArchiveCompactedFiles();
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, region.getRegionInfo(),
store.getColumnFamilyDescriptor().getName());
Path globalStoreArchiveDir = HFileArchiveUtil.getStoreArchivePathForArchivePath(
globalArchivePath, region.getRegionInfo(), store.getColumnFamilyDescriptor().getName());
try {
LocalRegionUtils.moveFilesUnderDir(fs, storeArchiveDir, globalStoreArchiveDir,
archivedHFileSuffix);
} catch (IOException e) {
LOG.warn("Failed to move archived hfiles from {} to global dir {}", storeArchiveDir,
globalStoreArchiveDir, e);
}
}
}
private void compact() {
try {
region.compact(true);
Iterables.getOnlyElement(region.getStores()).closeAndArchiveCompactedFiles();
moveHFileToGlobalArchiveDir();
} catch (IOException e) {
LOG.error("Failed to compact procedure store region", e);
}
@ -156,7 +171,12 @@ class RegionFlusherAndCompactor implements Closeable {
}
private boolean needCompaction() {
return Iterables.getOnlyElement(region.getStores()).getStorefilesCount() >= compactMin;
for (Store store : region.getStores()) {
if (store.getStorefilesCount() >= compactMin) {
return true;
}
}
return false;
}
private void flushLoop() {

View File

@ -0,0 +1,172 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.store;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.yetus.audience.InterfaceAudience;
/**
* The parameters for constructing {@link LocalRegion}.
*/
@InterfaceAudience.Private
public class LocalRegionParams {
private Server server;
private String regionDirName;
private TableDescriptor tableDescriptor;
private Long flushSize;
private Long flushPerChanges;
private Long flushIntervalMs;
private Integer compactMin;
private Integer maxWals;
private Boolean useHsync;
private Integer ringBufferSlotCount;
private Long rollPeriodMs;
private String archivedWalSuffix;
private String archivedHFileSuffix;
public LocalRegionParams server(Server server) {
this.server = server;
return this;
}
public LocalRegionParams regionDirName(String regionDirName) {
this.regionDirName = regionDirName;
return this;
}
public LocalRegionParams tableDescriptor(TableDescriptor tableDescriptor) {
this.tableDescriptor = tableDescriptor;
return this;
}
public LocalRegionParams flushSize(long flushSize) {
this.flushSize = flushSize;
return this;
}
public LocalRegionParams flushPerChanges(long flushPerChanges) {
this.flushPerChanges = flushPerChanges;
return this;
}
public LocalRegionParams flushIntervalMs(long flushIntervalMs) {
this.flushIntervalMs = flushIntervalMs;
return this;
}
public LocalRegionParams compactMin(int compactMin) {
this.compactMin = compactMin;
return this;
}
public LocalRegionParams maxWals(int maxWals) {
this.maxWals = maxWals;
return this;
}
public LocalRegionParams useHsync(boolean useHsync) {
this.useHsync = useHsync;
return this;
}
public LocalRegionParams ringBufferSlotCount(int ringBufferSlotCount) {
this.ringBufferSlotCount = ringBufferSlotCount;
return this;
}
public LocalRegionParams rollPeriodMs(long rollPeriodMs) {
this.rollPeriodMs = rollPeriodMs;
return this;
}
public LocalRegionParams archivedWalSuffix(String archivedWalSuffix) {
this.archivedWalSuffix = archivedWalSuffix;
return this;
}
public LocalRegionParams archivedHFileSuffix(String archivedHFileSuffix) {
this.archivedHFileSuffix = archivedHFileSuffix;
return this;
}
public Server server() {
return server;
}
public String regionDirName() {
return regionDirName;
}
public TableDescriptor tableDescriptor() {
return tableDescriptor;
}
public long flushSize() {
return flushSize;
}
public long flushPerChanges() {
return flushPerChanges;
}
public long flushIntervalMs() {
return flushIntervalMs;
}
public int compactMin() {
return compactMin;
}
public int maxWals() {
return maxWals;
}
public Boolean useHsync() {
return useHsync;
}
public int ringBufferSlotCount() {
return ringBufferSlotCount;
}
public long rollPeriodMs() {
return rollPeriodMs;
}
public String archivedWalSuffix() {
return archivedWalSuffix;
}
public String archivedHFileSuffix() {
return archivedHFileSuffix;
}
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.store;
import java.io.IOException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
final class LocalRegionUtils {
private static final Logger LOG = LoggerFactory.getLogger(LocalRegionUtils.class);
private LocalRegionUtils() {
}
static void moveFilesUnderDir(FileSystem fs, Path src, Path dst, String suffix)
throws IOException {
if (!fs.exists(dst) && !fs.mkdirs(dst)) {
LOG.warn("Failed to create dir {}", dst);
return;
}
FileStatus[] archivedWALFiles = fs.listStatus(src);
if (archivedWALFiles == null) {
return;
}
for (FileStatus status : archivedWALFiles) {
Path file = status.getPath();
Path newFile = new Path(dst, file.getName() + suffix);
if (fs.rename(file, newFile)) {
LOG.info("Moved {} to {}", file, newFile);
} else {
LOG.warn("Failed to move from {} to {}", file, newFile);
}
}
}
}

View File

@ -15,18 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.region;
package org.apache.hadoop.hbase.master.store;
import static org.apache.hadoop.hbase.HConstants.HREGION_OLDLOGDIR_NAME;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@ -38,23 +35,19 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* As long as there is no RegionServerServices for the procedure store region, we need implement log
* roller logic by our own.
* As long as there is no RegionServerServices for a local region, we need implement log roller
* logic by our own.
* <p/>
* We can reuse most of the code for normal wal roller, the only difference is that there is only
* one region, so in {@link #scheduleFlush(String)} method we can just schedule flush for the
* procedure store region.
*/
@InterfaceAudience.Private
final class RegionProcedureStoreWALRoller extends AbstractWALRoller<Abortable> {
public final class LocalRegionWALRoller extends AbstractWALRoller<Abortable> {
private static final Logger LOG = LoggerFactory.getLogger(RegionProcedureStoreWALRoller.class);
private static final Logger LOG = LoggerFactory.getLogger(LocalRegionWALRoller.class);
static final String ROLL_PERIOD_MS_KEY = "hbase.procedure.store.region.walroll.period.ms";
private static final long DEFAULT_ROLL_PERIOD_MS = TimeUnit.MINUTES.toMillis(15);
private volatile RegionFlusherAndCompactor flusherAndCompactor;
private volatile LocalRegionFlusherAndCompactor flusherAndCompactor;
private final FileSystem fs;
@ -62,36 +55,22 @@ final class RegionProcedureStoreWALRoller extends AbstractWALRoller<Abortable> {
private final Path globalWALArchiveDir;
private RegionProcedureStoreWALRoller(Configuration conf, Abortable abortable, FileSystem fs,
Path walRootDir, Path globalWALRootDir) {
super("RegionProcedureStoreWALRoller", conf, abortable);
private final String archivedWALSuffix;
private LocalRegionWALRoller(String name, Configuration conf, Abortable abortable, FileSystem fs,
Path walRootDir, Path globalWALRootDir, String archivedWALSuffix) {
super(name, conf, abortable);
this.fs = fs;
this.walArchiveDir = new Path(walRootDir, HREGION_OLDLOGDIR_NAME);
this.globalWALArchiveDir = new Path(globalWALRootDir, HREGION_OLDLOGDIR_NAME);
this.archivedWALSuffix = archivedWALSuffix;
}
@Override
protected void afterRoll(WAL wal) {
// move the archived WAL files to the global archive path
try {
if (!fs.exists(globalWALArchiveDir) && !fs.mkdirs(globalWALArchiveDir)) {
LOG.warn("Failed to create global archive dir {}", globalWALArchiveDir);
return;
}
FileStatus[] archivedWALFiles = fs.listStatus(walArchiveDir);
if (archivedWALFiles == null) {
return;
}
for (FileStatus status : archivedWALFiles) {
Path file = status.getPath();
Path newFile = new Path(globalWALArchiveDir,
file.getName() + MasterProcedureUtil.ARCHIVED_PROC_WAL_SUFFIX);
if (fs.rename(file, newFile)) {
LOG.info("Moved {} to {}", file, newFile);
} else {
LOG.warn("Failed to move archived wal from {} to global place {}", file, newFile);
}
}
LocalRegionUtils.moveFilesUnderDir(fs, walArchiveDir, globalWALArchiveDir, archivedWALSuffix);
} catch (IOException e) {
LOG.warn("Failed to move archived wals from {} to global dir {}", walArchiveDir,
globalWALArchiveDir, e);
@ -100,28 +79,29 @@ final class RegionProcedureStoreWALRoller extends AbstractWALRoller<Abortable> {
@Override
protected void scheduleFlush(String encodedRegionName) {
RegionFlusherAndCompactor flusher = this.flusherAndCompactor;
LocalRegionFlusherAndCompactor flusher = this.flusherAndCompactor;
if (flusher != null) {
flusher.requestFlush();
}
}
void setFlusherAndCompactor(RegionFlusherAndCompactor flusherAndCompactor) {
void setFlusherAndCompactor(LocalRegionFlusherAndCompactor flusherAndCompactor) {
this.flusherAndCompactor = flusherAndCompactor;
}
static RegionProcedureStoreWALRoller create(Configuration conf, Abortable abortable,
FileSystem fs, Path walRootDir, Path globalWALRootDir) {
static LocalRegionWALRoller create(String name, Configuration conf, Abortable abortable,
FileSystem fs, Path walRootDir, Path globalWALRootDir, String archivedWALSuffix,
long rollPeriodMs, long flushSize) {
// we can not run with wal disabled, so force set it to true.
conf.setBoolean(WALFactory.WAL_ENABLED, true);
// we do not need this feature, so force disable it.
conf.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, false);
conf.setLong(WAL_ROLL_PERIOD_KEY, conf.getLong(ROLL_PERIOD_MS_KEY, DEFAULT_ROLL_PERIOD_MS));
long flushSize = conf.getLong(RegionFlusherAndCompactor.FLUSH_SIZE_KEY,
RegionFlusherAndCompactor.DEFAULT_FLUSH_SIZE);
conf.setLong(WAL_ROLL_PERIOD_KEY, rollPeriodMs);
// make the roll size the same with the flush size, as we only have one region here
conf.setLong(WALUtil.WAL_BLOCK_SIZE, flushSize * 2);
conf.setFloat(AbstractFSWAL.WAL_ROLL_MULTIPLIER, 0.5f);
return new RegionProcedureStoreWALRoller(conf, abortable, fs, walRootDir, globalWALRootDir);
return new LocalRegionWALRoller(name, conf, abortable, fs, walRootDir, globalWALRootDir,
archivedWALSuffix);
}
}

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.store;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* Used for storing data at master side. The data will be stored in a {@link LocalRegion}.
*/
@InterfaceAudience.Private
public final class LocalStore {
// Use the character $ to let the log cleaner know that this is not the normal wal file.
public static final String ARCHIVED_WAL_SUFFIX = "$masterlocalwal$";
// this is a bit trick that in StoreFileInfo.validateStoreFileName, we just test if the file name
// contains '-' to determine if it is a valid store file, so here we have to add '-'in the file
// name to avoid being processed by normal TimeToLiveHFileCleaner.
public static final String ARCHIVED_HFILE_SUFFIX = "$-masterlocalhfile-$";
private static final String MAX_WALS_KEY = "hbase.master.store.region.maxwals";
private static final int DEFAULT_MAX_WALS = 10;
public static final String USE_HSYNC_KEY = "hbase.master.store.region.wal.hsync";
public static final String MASTER_STORE_DIR = "MasterData";
private static final String FLUSH_SIZE_KEY = "hbase.master.store.region.flush.size";
private static final long DEFAULT_FLUSH_SIZE = TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE;
private static final String FLUSH_PER_CHANGES_KEY = "hbase.master.store.region.flush.per.changes";
private static final long DEFAULT_FLUSH_PER_CHANGES = 1_000_000;
private static final String FLUSH_INTERVAL_MS_KEY = "hbase.master.store.region.flush.interval.ms";
// default to flush every 15 minutes, for safety
private static final long DEFAULT_FLUSH_INTERVAL_MS = TimeUnit.MINUTES.toMillis(15);
private static final String COMPACT_MIN_KEY = "hbase.master.store.region.compact.min";
private static final int DEFAULT_COMPACT_MIN = 4;
private static final String ROLL_PERIOD_MS_KEY = "hbase.master.store.region.walroll.period.ms";
private static final long DEFAULT_ROLL_PERIOD_MS = TimeUnit.MINUTES.toMillis(15);
private static final String RING_BUFFER_SLOT_COUNT = "hbase.master.store.ringbuffer.slot.count";
private static final int DEFAULT_RING_BUFFER_SLOT_COUNT = 128;
public static final TableName TABLE_NAME = TableName.valueOf("master:store");
public static final byte[] PROC_FAMILY = Bytes.toBytes("proc");
private static final TableDescriptor TABLE_DESC = TableDescriptorBuilder.newBuilder(TABLE_NAME)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(PROC_FAMILY)).build();
private final LocalRegion region;
private LocalStore(LocalRegion region) {
this.region = region;
}
public void update(UpdateLocalRegion action) throws IOException {
region.update(action);
}
public Result get(Get get) throws IOException {
return region.get(get);
}
public RegionScanner getScanner(Scan scan) throws IOException {
return region.getScanner(scan);
}
public void close(boolean abort) {
region.close(abort);
}
@VisibleForTesting
public FlushResult flush(boolean force) throws IOException {
return region.flush(force);
}
@VisibleForTesting
public void requestRollAll() {
region.requestRollAll();
}
@VisibleForTesting
public void waitUntilWalRollFinished() throws InterruptedException {
region.waitUntilWalRollFinished();
}
public static LocalStore create(Server server) throws IOException {
LocalRegionParams params = new LocalRegionParams().server(server)
.regionDirName(MASTER_STORE_DIR).tableDescriptor(TABLE_DESC);
Configuration conf = server.getConfiguration();
long flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
long flushPerChanges = conf.getLong(FLUSH_PER_CHANGES_KEY, DEFAULT_FLUSH_PER_CHANGES);
long flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
int compactMin = conf.getInt(COMPACT_MIN_KEY, DEFAULT_COMPACT_MIN);
params.flushSize(flushSize).flushPerChanges(flushPerChanges).flushIntervalMs(flushIntervalMs)
.compactMin(compactMin);
int maxWals = conf.getInt(MAX_WALS_KEY, DEFAULT_MAX_WALS);
params.maxWals(maxWals);
if (conf.get(USE_HSYNC_KEY) != null) {
params.useHsync(conf.getBoolean(USE_HSYNC_KEY, false));
}
params.ringBufferSlotCount(conf.getInt(RING_BUFFER_SLOT_COUNT, DEFAULT_RING_BUFFER_SLOT_COUNT));
long rollPeriodMs = conf.getLong(ROLL_PERIOD_MS_KEY, DEFAULT_ROLL_PERIOD_MS);
params.rollPeriodMs(rollPeriodMs).archivedWalSuffix(ARCHIVED_WAL_SUFFIX)
.archivedHFileSuffix(ARCHIVED_HFILE_SUFFIX);
LocalRegion region = LocalRegion.create(params);
return new LocalStore(region);
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.store;
import java.io.IOException;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
@FunctionalInterface
public interface UpdateLocalRegion {
void update(HRegion region) throws IOException;
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
@ -76,16 +77,15 @@ public class HFileProcedurePrettyPrinter extends AbstractHBaseTool {
addOptWithArg("w", "seekToPid", "Seek to this procedure id and print this procedure only");
OptionGroup files = new OptionGroup();
files.addOption(new Option("f", "file", true,
"File to scan. Pass full-path; e.g. hdfs://a:9000/MasterProcs/master/procedure/p/xxx"));
"File to scan. Pass full-path; e.g. hdfs://a:9000/MasterProcs/master/local/proc/xxx"));
files.addOption(new Option("a", "all", false, "Scan the whole procedure region."));
files.setRequired(true);
options.addOptionGroup(files);
}
private void addAllHFiles() throws IOException {
Path masterProcDir =
new Path(CommonFSUtils.getWALRootDir(conf), RegionProcedureStore.MASTER_PROCEDURE_DIR);
Path tableDir = CommonFSUtils.getTableDir(masterProcDir, RegionProcedureStore.TABLE_NAME);
Path masterProcDir = new Path(CommonFSUtils.getRootDir(conf), LocalStore.MASTER_STORE_DIR);
Path tableDir = CommonFSUtils.getTableDir(masterProcDir, LocalStore.TABLE_NAME);
FileSystem fs = tableDir.getFileSystem(conf);
Path regionDir =
fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]

View File

@ -18,8 +18,8 @@
package org.apache.hadoop.hbase.procedure2.store.region;
import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
import static org.apache.hadoop.hbase.HConstants.HREGION_LOGDIR_NAME;
import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
import static org.apache.hadoop.hbase.master.store.LocalStore.PROC_FAMILY;
import java.io.IOException;
import java.io.UncheckedIOException;
@ -30,86 +30,48 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure;
import org.apache.hadoop.hbase.master.assignment.UnassignProcedure;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
import org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hbase.thirdparty.com.google.common.math.IntMath;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
/**
* A procedure store which uses a region to store all the procedures.
* A procedure store which uses the master local store to store all the procedures.
* <p/>
* FileSystem layout:
*
* <pre>
* hbase
* |
* --MasterProcs
* |
* --data
* | |
* | --/master/procedure/&lt;encoded-region-name&gt; <---- The region data
* | |
* | --replay <---- The edits to replay
* |
* --WALs
* |
* --&lt;master-server-name&gt; <---- The WAL dir for active master
* |
* --&lt;master-server-name&gt;-dead <---- The WAL dir dead master
* </pre>
*
* We use p:d column to store the serialized protobuf format procedure, and when deleting we will
* We use proc:d column to store the serialized protobuf format procedure, and when deleting we will
* first fill the info:proc column with an empty byte array, and then actually delete them in the
* {@link #cleanup()} method. This is because that we need to retain the max procedure id, so we can
* not directly delete a procedure row as we do not know if it is the one with the max procedure id.
@ -119,58 +81,20 @@ public class RegionProcedureStore extends ProcedureStoreBase {
private static final Logger LOG = LoggerFactory.getLogger(RegionProcedureStore.class);
static final String MAX_WALS_KEY = "hbase.procedure.store.region.maxwals";
private static final int DEFAULT_MAX_WALS = 10;
static final String USE_HSYNC_KEY = "hbase.procedure.store.region.wal.hsync";
static final String MASTER_PROCEDURE_DIR = "MasterProcs";
static final String HFILECLEANER_PLUGINS = "hbase.procedure.store.region.hfilecleaner.plugins";
private static final String REPLAY_EDITS_DIR = "recovered.wals";
private static final String DEAD_WAL_DIR_SUFFIX = "-dead";
static final TableName TABLE_NAME = TableName.valueOf("master:procedure");
static final byte[] FAMILY = Bytes.toBytes("p");
static final byte[] PROC_QUALIFIER = Bytes.toBytes("d");
private static final int REGION_ID = 1;
private static final TableDescriptor TABLE_DESC = TableDescriptorBuilder.newBuilder(TABLE_NAME)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
private final Server server;
private final DirScanPool cleanerPool;
private final LeaseRecovery leaseRecovery;
// Used to delete the compacted hfiles. Since we put all data on WAL filesystem, it is not
// possible to move the compacted hfiles to the global hfile archive directory, we have to do it
// by ourselves.
private HFileCleaner cleaner;
private WALFactory walFactory;
@VisibleForTesting
HRegion region;
@VisibleForTesting
RegionFlusherAndCompactor flusherAndCompactor;
@VisibleForTesting
RegionProcedureStoreWALRoller walRoller;
final LocalStore localStore;
private int numThreads;
public RegionProcedureStore(Server server, DirScanPool cleanerPool, LeaseRecovery leaseRecovery) {
public RegionProcedureStore(Server server, LocalStore localStore, LeaseRecovery leaseRecovery) {
this.server = server;
this.cleanerPool = cleanerPool;
this.localStore = localStore;
this.leaseRecovery = leaseRecovery;
}
@ -183,52 +107,12 @@ public class RegionProcedureStore extends ProcedureStoreBase {
this.numThreads = numThreads;
}
private void shutdownWAL() {
if (walFactory != null) {
try {
walFactory.shutdown();
} catch (IOException e) {
LOG.warn("Failed to shutdown WAL", e);
}
}
}
private void closeRegion(boolean abort) {
if (region != null) {
try {
region.close(abort);
} catch (IOException e) {
LOG.warn("Failed to close region", e);
}
}
}
@Override
public void stop(boolean abort) {
if (!setRunning(false)) {
return;
}
LOG.info("Stopping the Region Procedure Store, isAbort={}", abort);
if (cleaner != null) {
cleaner.cancel(abort);
}
if (flusherAndCompactor != null) {
flusherAndCompactor.close();
}
// if abort, we shutdown wal first to fail the ongoing updates to the region, and then close the
// region, otherwise there will be dead lock.
if (abort) {
shutdownWAL();
closeRegion(true);
} else {
closeRegion(false);
shutdownWAL();
}
if (walRoller != null) {
walRoller.close();
}
}
@Override
@ -242,91 +126,6 @@ public class RegionProcedureStore extends ProcedureStoreBase {
return count;
}
private WAL createWAL(FileSystem fs, Path rootDir, RegionInfo regionInfo) throws IOException {
String logName = AbstractFSWALProvider.getWALDirectoryName(server.getServerName().toString());
Path walDir = new Path(rootDir, logName);
LOG.debug("WALDir={}", walDir);
if (fs.exists(walDir)) {
throw new HBaseIOException(
"Master procedure store has already created directory at " + walDir);
}
if (!fs.mkdirs(walDir)) {
throw new IOException("Can not create master procedure wal directory " + walDir);
}
WAL wal = walFactory.getWAL(regionInfo);
walRoller.addWAL(wal);
return wal;
}
private HRegion bootstrap(Configuration conf, FileSystem fs, Path rootDir) throws IOException {
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(TABLE_NAME).setRegionId(REGION_ID).build();
Path tmpTableDir = CommonFSUtils.getTableDir(rootDir, TableName
.valueOf(TABLE_NAME.getNamespaceAsString(), TABLE_NAME.getQualifierAsString() + "-tmp"));
if (fs.exists(tmpTableDir) && !fs.delete(tmpTableDir, true)) {
throw new IOException("Can not delete partial created proc region " + tmpTableDir);
}
HRegion.createHRegion(conf, regionInfo, fs, tmpTableDir, TABLE_DESC).close();
Path tableDir = CommonFSUtils.getTableDir(rootDir, TABLE_NAME);
if (!fs.rename(tmpTableDir, tableDir)) {
throw new IOException("Can not rename " + tmpTableDir + " to " + tableDir);
}
WAL wal = createWAL(fs, rootDir, regionInfo);
return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, TABLE_DESC, wal, null,
null);
}
private HRegion open(Configuration conf, FileSystem fs, Path rootDir) throws IOException {
String factoryId = server.getServerName().toString();
Path tableDir = CommonFSUtils.getTableDir(rootDir, TABLE_NAME);
Path regionDir =
fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
.getPath();
Path replayEditsDir = new Path(regionDir, REPLAY_EDITS_DIR);
if (!fs.exists(replayEditsDir) && !fs.mkdirs(replayEditsDir)) {
throw new IOException("Failed to create replay directory: " + replayEditsDir);
}
Path walsDir = new Path(rootDir, HREGION_LOGDIR_NAME);
for (FileStatus walDir : fs.listStatus(walsDir)) {
if (!walDir.isDirectory()) {
continue;
}
if (walDir.getPath().getName().startsWith(factoryId)) {
LOG.warn("This should not happen in real production as we have not created our WAL " +
"directory yet, ignore if you are running a procedure related UT");
}
Path deadWALDir;
if (!walDir.getPath().getName().endsWith(DEAD_WAL_DIR_SUFFIX)) {
deadWALDir =
new Path(walDir.getPath().getParent(), walDir.getPath().getName() + DEAD_WAL_DIR_SUFFIX);
if (!fs.rename(walDir.getPath(), deadWALDir)) {
throw new IOException("Can not rename " + walDir + " to " + deadWALDir +
" when recovering lease of proc store");
}
LOG.info("Renamed {} to {} as it is dead", walDir.getPath(), deadWALDir);
} else {
deadWALDir = walDir.getPath();
LOG.info("{} is already marked as dead", deadWALDir);
}
for (FileStatus walFile : fs.listStatus(deadWALDir)) {
Path replayEditsFile = new Path(replayEditsDir, walFile.getPath().getName());
leaseRecovery.recoverFileLease(fs, walFile.getPath());
if (!fs.rename(walFile.getPath(), replayEditsFile)) {
throw new IOException("Can not rename " + walFile.getPath() + " to " + replayEditsFile +
" when recovering lease of proc store");
}
LOG.info("Renamed {} to {}", walFile.getPath(), replayEditsFile);
}
LOG.info("Delete empty proc wal dir {}", deadWALDir);
fs.delete(deadWALDir, true);
}
RegionInfo regionInfo = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
WAL wal = createWAL(fs, rootDir, regionInfo);
conf.set(HRegion.SPECIAL_RECOVERED_EDITS_DIR,
replayEditsDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, TABLE_DESC, wal, null,
null);
}
@SuppressWarnings("deprecation")
private static final ImmutableSet<Class<?>> UNSUPPORTED_PROCEDURES =
ImmutableSet.of(RecoverMetaProcedure.class, AssignProcedure.class, UnassignProcedure.class,
@ -437,8 +236,8 @@ public class RegionProcedureStore extends ProcedureStoreBase {
if (maxProcIdSet.longValue() > maxProcIdFromProcs.longValue()) {
if (maxProcIdSet.longValue() > 0) {
// let's add a fake row to retain the max proc id
region.put(new Put(Bytes.toBytes(maxProcIdSet.longValue())).addColumn(FAMILY,
PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
localStore.update(r -> r.put(new Put(Bytes.toBytes(maxProcIdSet.longValue()))
.addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY)));
}
} else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures");
@ -453,46 +252,8 @@ public class RegionProcedureStore extends ProcedureStoreBase {
@Override
public void recoverLease() throws IOException {
LOG.debug("Starting Region Procedure Store lease recovery...");
Configuration baseConf = server.getConfiguration();
FileSystem fs = CommonFSUtils.getWALFileSystem(baseConf);
Path globalWALRootDir = CommonFSUtils.getWALRootDir(baseConf);
Path rootDir = new Path(globalWALRootDir, MASTER_PROCEDURE_DIR);
// we will override some configurations so create a new one.
Configuration conf = new Configuration(baseConf);
CommonFSUtils.setRootDir(conf, rootDir);
CommonFSUtils.setWALRootDir(conf, rootDir);
RegionFlusherAndCompactor.setupConf(conf);
conf.setInt(AbstractFSWAL.MAX_LOGS, conf.getInt(MAX_WALS_KEY, DEFAULT_MAX_WALS));
if (conf.get(USE_HSYNC_KEY) != null) {
conf.set(HRegion.WAL_HSYNC_CONF_KEY, conf.get(USE_HSYNC_KEY));
}
conf.setInt(AbstractFSWAL.RING_BUFFER_SLOT_COUNT, IntMath.ceilingPowerOfTwo(16 * numThreads));
walRoller = RegionProcedureStoreWALRoller.create(conf, server, fs, rootDir, globalWALRootDir);
walRoller.start();
walFactory = new WALFactory(conf, server.getServerName().toString());
Path tableDir = CommonFSUtils.getTableDir(rootDir, TABLE_NAME);
if (fs.exists(tableDir)) {
// load the existing region.
region = open(conf, fs, rootDir);
} else {
// bootstrapping...
region = bootstrap(conf, fs, rootDir);
}
flusherAndCompactor = new RegionFlusherAndCompactor(conf, server, region);
walRoller.setFlusherAndCompactor(flusherAndCompactor);
int cleanerInterval = conf.getInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL,
HMaster.DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
if (!fs.mkdirs(archiveDir)) {
LOG.warn("Failed to create archive directory {}. Usually this should not happen but it will" +
" be created again when we actually archive the hfiles later, so continue", archiveDir);
}
cleaner = new HFileCleaner("RegionProcedureStoreHFileCleaner", cleanerInterval, server, conf,
fs, archiveDir, HFILECLEANER_PLUGINS, cleanerPool, Collections.emptyMap());
server.getChoreService().scheduleChore(cleaner);
LOG.info("Starting Region Procedure Store lease recovery...");
FileSystem fs = CommonFSUtils.getWALFileSystem(server.getConfiguration());
tryMigrate(fs);
}
@ -501,7 +262,8 @@ public class RegionProcedureStore extends ProcedureStoreBase {
List<ProcedureProtos.Procedure> procs = new ArrayList<>();
long maxProcId = 0;
try (RegionScanner scanner = region.getScanner(new Scan().addColumn(FAMILY, PROC_QUALIFIER))) {
try (RegionScanner scanner =
localStore.getScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER))) {
List<Cell> cells = new ArrayList<>();
boolean moreRows;
do {
@ -530,7 +292,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
throws IOException {
ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
byte[] row = Bytes.toBytes(proc.getProcId());
mutations.add(new Put(row).addColumn(FAMILY, PROC_QUALIFIER, proto.toByteArray()));
mutations.add(new Put(row).addColumn(PROC_FAMILY, PROC_QUALIFIER, proto.toByteArray()));
rowsToLock.add(row);
}
@ -538,7 +300,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
// the proc column with an empty array.
private void serializeDelete(long procId, List<Mutation> mutations, List<byte[]> rowsToLock) {
byte[] row = Bytes.toBytes(procId);
mutations.add(new Put(row).addColumn(FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
mutations.add(new Put(row).addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
rowsToLock.add(row);
}
@ -571,14 +333,13 @@ public class RegionProcedureStore extends ProcedureStoreBase {
for (Procedure<?> subProc : subProcs) {
serializePut(subProc, mutations, rowsToLock);
}
region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
localStore.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc,
Arrays.toString(subProcs), e);
throw new UncheckedIOException(e);
}
});
flusherAndCompactor.onUpdate();
}
@Override
@ -590,13 +351,12 @@ public class RegionProcedureStore extends ProcedureStoreBase {
for (Procedure<?> proc : procs) {
serializePut(proc, mutations, rowsToLock);
}
region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
localStore.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e);
throw new UncheckedIOException(e);
}
});
flusherAndCompactor.onUpdate();
}
@Override
@ -604,26 +364,24 @@ public class RegionProcedureStore extends ProcedureStoreBase {
runWithoutRpcCall(() -> {
try {
ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
region.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(FAMILY, PROC_QUALIFIER,
proto.toByteArray()));
localStore.update(r -> r.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(PROC_FAMILY,
PROC_QUALIFIER, proto.toByteArray())));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e);
throw new UncheckedIOException(e);
}
});
flusherAndCompactor.onUpdate();
}
@Override
public void delete(long procId) {
try {
region
.put(new Put(Bytes.toBytes(procId)).addColumn(FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
localStore.update(r -> r.put(
new Put(Bytes.toBytes(procId)).addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY)));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to delete pid={}", procId, e);
throw new UncheckedIOException(e);
}
flusherAndCompactor.onUpdate();
}
@Override
@ -635,13 +393,12 @@ public class RegionProcedureStore extends ProcedureStoreBase {
for (long subProcId : subProcIds) {
serializeDelete(subProcId, mutations, rowsToLock);
}
region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
localStore.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to delete parent proc {}, sub pids={}", parentProc,
Arrays.toString(subProcIds), e);
throw new UncheckedIOException(e);
}
flusherAndCompactor.onUpdate();
}
@Override
@ -660,12 +417,11 @@ public class RegionProcedureStore extends ProcedureStoreBase {
serializeDelete(procId, mutations, rowsToLock);
}
try {
region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
localStore.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to delete pids={}", Arrays.toString(procIds), e);
throw new UncheckedIOException(e);
}
flusherAndCompactor.onUpdate();
}
@Override
@ -673,7 +429,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
// actually delete the procedures if it is not the one with the max procedure id.
List<Cell> cells = new ArrayList<Cell>();
try (RegionScanner scanner =
region.getScanner(new Scan().addColumn(FAMILY, PROC_QUALIFIER).setReversed(true))) {
localStore.getScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER).setReversed(true))) {
// skip the row with max procedure id
boolean moreRows = scanner.next(cells);
if (cells.isEmpty()) {
@ -688,7 +444,8 @@ public class RegionProcedureStore extends ProcedureStoreBase {
Cell cell = cells.get(0);
cells.clear();
if (cell.getValueLength() == 0) {
region.delete(new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
localStore.update(r -> r
.delete(new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())));
}
}
} catch (IOException e) {

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hbase.procedure2.store.region;
import static org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore.FAMILY;
import static org.apache.hadoop.hbase.master.store.LocalStore.PROC_FAMILY;
import java.io.PrintStream;
import java.time.Instant;
@ -102,8 +102,8 @@ public class WALProcedurePrettyPrinter extends AbstractHBaseTool {
String.format(KEY_TMPL, sequenceId, FORMATTER.format(Instant.ofEpochMilli(writeTime))));
for (Cell cell : edit.getCells()) {
Map<String, Object> op = WALPrettyPrinter.toStringMap(cell);
if (!Bytes.equals(FAMILY, 0, FAMILY.length, cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength())) {
if (!Bytes.equals(PROC_FAMILY, 0, PROC_FAMILY.length, cell.getFamilyArray(),
cell.getFamilyOffset(), cell.getFamilyLength())) {
// We could have cells other than procedure edits, for example, a flush marker
WALPrettyPrinter.printCell(out, op, false);
continue;

View File

@ -42,12 +42,11 @@ public final class HFileArchiveUtil {
* @param tableName table name under which the store currently lives
* @param regionName region encoded name under which the store currently lives
* @param familyName name of the family in the store
* @return {@link Path} to the directory to archive the given store or
* <tt>null</tt> if it should not be archived
* @return {@link Path} to the directory to archive the given store or <tt>null</tt> if it should
* not be archived
*/
public static Path getStoreArchivePath(final Configuration conf,
final TableName tableName,
final String regionName, final String familyName) throws IOException {
public static Path getStoreArchivePath(final Configuration conf, final TableName tableName,
final String regionName, final String familyName) throws IOException {
Path tableArchiveDir = getTableArchivePath(conf, tableName);
return HStore.getStoreHomedir(tableArchiveDir, regionName, Bytes.toBytes(familyName));
}
@ -61,10 +60,8 @@ public final class HFileArchiveUtil {
* @return {@link Path} to the directory to archive the given store or <tt>null</tt> if it should
* not be archived
*/
public static Path getStoreArchivePath(Configuration conf,
RegionInfo region,
Path tabledir,
byte[] family) throws IOException {
public static Path getStoreArchivePath(Configuration conf, RegionInfo region, Path tabledir,
byte[] family) throws IOException {
return getStoreArchivePath(conf, region, family);
}
@ -84,22 +81,27 @@ public final class HFileArchiveUtil {
}
/**
* Gets the archive directory under specified root dir. One scenario where this is useful is
* when WAL and root dir are configured under different file systems,
* i.e. root dir on S3 and WALs on HDFS.
* This is mostly useful for archiving recovered edits, when
* Gets the archive directory under specified root dir. One scenario where this is useful is when
* WAL and root dir are configured under different file systems, i.e. root dir on S3 and WALs on
* HDFS. This is mostly useful for archiving recovered edits, when
* <b>hbase.region.archive.recovered.edits</b> is enabled.
* @param rootDir {@link Path} the root dir under which archive path should be created.
* @param region parent region information under which the store currently lives
* @param family name of the family in the store
* @return {@link Path} to the WAL FS directory to archive the given store
* or <tt>null</tt> if it should not be archived
* @return {@link Path} to the WAL FS directory to archive the given store or <tt>null</tt> if it
* should not be archived
*/
public static Path getStoreArchivePathForRootDir(Path rootDir, RegionInfo region, byte[] family) {
Path tableArchiveDir = getTableArchivePath(rootDir, region.getTable());
return HStore.getStoreHomedir(tableArchiveDir, region, family);
}
public static Path getStoreArchivePathForArchivePath(Path archivePath, RegionInfo region,
byte[] family) {
Path tableArchiveDir = CommonFSUtils.getTableDir(archivePath, region.getTable());
return HStore.getStoreHomedir(tableArchiveDir, region, family);
}
/**
* Get the archive directory for a given region under the specified table
* @param tableName the table name. Cannot be null.
@ -107,9 +109,7 @@ public final class HFileArchiveUtil {
* @return {@link Path} to the directory to archive the given region, or <tt>null</tt> if it
* should not be archived
*/
public static Path getRegionArchiveDir(Path rootDir,
TableName tableName,
Path regiondir) {
public static Path getRegionArchiveDir(Path rootDir, TableName tableName, Path regiondir) {
// get the archive directory for a table
Path archiveDir = getTableArchivePath(rootDir, tableName);
@ -126,8 +126,8 @@ public final class HFileArchiveUtil {
* @return {@link Path} to the directory to archive the given region, or <tt>null</tt> if it
* should not be archived
*/
public static Path getRegionArchiveDir(Path rootDir,
TableName tableName, String encodedRegionName) {
public static Path getRegionArchiveDir(Path rootDir, TableName tableName,
String encodedRegionName) {
// get the archive directory for a table
Path archiveDir = getTableArchivePath(rootDir, tableName);
return HRegion.getRegionDir(archiveDir, encodedRegionName);

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.store;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.junit.After;
import org.junit.Before;
public class LocalRegionTestBase {
protected HBaseCommonTestingUtility htu;
protected LocalRegion region;
protected ChoreService choreService;
protected DirScanPool cleanerPool;
protected static byte[] CF1 = Bytes.toBytes("f1");
protected static byte[] CF2 = Bytes.toBytes("f2");
protected static byte[] QUALIFIER = Bytes.toBytes("q");
protected static String REGION_DIR_NAME = "local";
protected static TableDescriptor TD =
TableDescriptorBuilder.newBuilder(TableName.valueOf("test:local"))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF2)).build();
protected void configure(Configuration conf) throws IOException {
}
protected void configure(LocalRegionParams params) {
}
protected void postSetUp() throws IOException {
}
@Before
public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility();
htu.getConfiguration().setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
// Runs on local filesystem. Test does not need sync. Turn off checks.
htu.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
configure(htu.getConfiguration());
choreService = new ChoreService(getClass().getSimpleName());
cleanerPool = new DirScanPool(htu.getConfiguration());
Server server = mock(Server.class);
when(server.getConfiguration()).thenReturn(htu.getConfiguration());
when(server.getServerName())
.thenReturn(ServerName.valueOf("localhost", 12345, System.currentTimeMillis()));
when(server.getChoreService()).thenReturn(choreService);
Path testDir = htu.getDataTestDir();
CommonFSUtils.setRootDir(htu.getConfiguration(), testDir);
LocalRegionParams params = new LocalRegionParams();
params.server(server).regionDirName(REGION_DIR_NAME).tableDescriptor(TD)
.flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000)
.flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(4).maxWals(32).useHsync(false)
.ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15))
.archivedWalSuffix(LocalStore.ARCHIVED_WAL_SUFFIX)
.archivedHFileSuffix(LocalStore.ARCHIVED_HFILE_SUFFIX);
configure(params);
region = LocalRegion.create(params);
postSetUp();
}
@After
public void tearDown() throws IOException {
region.close(true);
cleanerPool.shutdownNow();
choreService.shutdown();
htu.cleanupTestDir();
}
}

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.store;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveMasterLocalStoreHFileCleaner;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, MediumTests.class })
public class TestLocalRegionCompaction extends LocalRegionTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestLocalRegionCompaction.class);
private int compactMin = 4;
private HFileCleaner hfileCleaner;
@Override
protected void configure(LocalRegionParams params) {
params.compactMin(compactMin);
}
@Override
protected void postSetUp() throws IOException {
Configuration conf = htu.getConfiguration();
conf.setLong(TimeToLiveMasterLocalStoreHFileCleaner.TTL_CONF_KEY, 5000);
Path testDir = htu.getDataTestDir();
FileSystem fs = testDir.getFileSystem(conf);
Path globalArchivePath = HFileArchiveUtil.getArchivePath(conf);
hfileCleaner = new HFileCleaner(500, new Stoppable() {
private volatile boolean stopped = false;
@Override
public void stop(String why) {
stopped = true;
}
@Override
public boolean isStopped() {
return stopped;
}
}, conf, fs, globalArchivePath, cleanerPool);
choreService.scheduleChore(hfileCleaner);
}
private int getStorefilesCount() {
return region.region.getStores().stream().mapToInt(Store::getStorefilesCount).sum();
}
private void assertFileCount(FileSystem fs, Path storeArchiveDir, int expected)
throws IOException {
FileStatus[] compactedHFiles = fs.listStatus(storeArchiveDir);
assertEquals(expected, compactedHFiles.length);
}
@Test
public void test() throws IOException, InterruptedException {
for (int i = 0; i < compactMin - 1; i++) {
final int index = i;
region.update(
r -> r.put(new Put(Bytes.toBytes(index)).addColumn(CF1, QUALIFIER, Bytes.toBytes(index))
.addColumn(CF2, QUALIFIER, Bytes.toBytes(index))));
region.flush(true);
}
assertEquals(2 * (compactMin - 1), getStorefilesCount());
region.update(r -> r.put(new Put(Bytes.toBytes(compactMin - 1)).addColumn(CF1, QUALIFIER,
Bytes.toBytes(compactMin - 1))));
region.flusherAndCompactor.requestFlush();
htu.waitFor(15000, () -> getStorefilesCount() == 2);
Path store1ArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(htu.getDataTestDir(),
region.region.getRegionInfo(), CF1);
Path store2ArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(htu.getDataTestDir(),
region.region.getRegionInfo(), CF2);
FileSystem fs = store1ArchiveDir.getFileSystem(htu.getConfiguration());
// after compaction, the old hfiles should have been compacted
htu.waitFor(15000, () -> {
try {
FileStatus[] fses1 = fs.listStatus(store1ArchiveDir);
FileStatus[] fses2 = fs.listStatus(store2ArchiveDir);
return fses1 != null && fses1.length == compactMin && fses2 != null &&
fses2.length == compactMin - 1;
} catch (FileNotFoundException e) {
return false;
}
});
// ttl has not expired, so should not delete any files
Thread.sleep(1000);
FileStatus[] compactedHFiles = fs.listStatus(store1ArchiveDir);
assertEquals(compactMin, compactedHFiles.length);
assertFileCount(fs, store2ArchiveDir, compactMin - 1);
Thread.sleep(2000);
// touch one file
long currentTime = System.currentTimeMillis();
fs.setTimes(compactedHFiles[0].getPath(), currentTime, currentTime);
Thread.sleep(3000);
// only the touched file is still there after clean up
FileStatus[] remainingHFiles = fs.listStatus(store1ArchiveDir);
assertEquals(1, remainingHFiles.length);
assertEquals(compactedHFiles[0].getPath(), remainingHFiles[0].getPath());
assertFalse(fs.exists(store2ArchiveDir));
Thread.sleep(6000);
// the touched file should also be cleaned up and then the cleaner will delete the parent
// directory since it is empty.
assertFalse(fs.exists(store1ArchiveDir));
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.region;
package org.apache.hadoop.hbase.master.store;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -25,13 +25,17 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -43,17 +47,17 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, MediumTests.class })
public class TestRegionProcedureStoreFlush {
public class TestLocalRegionFlush {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionProcedureStoreFlush.class);
HBaseClassTestRule.forClass(TestLocalRegionFlush.class);
private Configuration conf;
private HRegion region;
private RegionFlusherAndCompactor flusher;
private LocalRegionFlusherAndCompactor flusher;
private AtomicInteger flushCalled;
@ -68,6 +72,8 @@ public class TestRegionProcedureStoreFlush {
HStore store = mock(HStore.class);
when(store.getStorefilesCount()).thenReturn(1);
when(region.getStores()).thenReturn(Collections.singletonList(store));
when(region.getRegionInfo())
.thenReturn(RegionInfoBuilder.newBuilder(TableName.valueOf("hbase:local")).build());
flushCalled = new AtomicInteger(0);
memstoreHeapSize = new AtomicLong(0);
memstoreOffHeapSize = new AtomicLong(0);
@ -90,8 +96,8 @@ public class TestRegionProcedureStoreFlush {
}
}
private void initFlusher() {
flusher = new RegionFlusherAndCompactor(conf, new Abortable() {
private void initFlusher(long flushSize, long flushPerChanges, long flushIntervalMs) {
flusher = new LocalRegionFlusherAndCompactor(conf, new Abortable() {
@Override
public boolean isAborted() {
@ -101,13 +107,12 @@ public class TestRegionProcedureStoreFlush {
@Override
public void abort(String why, Throwable e) {
}
}, region);
}, region, flushSize, flushPerChanges, flushIntervalMs, 4, new Path("/tmp"), "");
}
@Test
public void testTriggerFlushBySize() throws IOException, InterruptedException {
conf.setLong(RegionFlusherAndCompactor.FLUSH_SIZE_KEY, 1024 * 1024);
initFlusher();
initFlusher(1024 * 1024, 1_000_000, TimeUnit.MINUTES.toMillis(15));
memstoreHeapSize.set(1000 * 1024);
flusher.onUpdate();
Thread.sleep(1000);
@ -130,16 +135,14 @@ public class TestRegionProcedureStoreFlush {
@Test
public void testTriggerFlushByChanges() throws InterruptedException {
conf.setLong(RegionFlusherAndCompactor.FLUSH_PER_CHANGES_KEY, 10);
initFlusher();
initFlusher(128 * 1024 * 1024, 10, TimeUnit.MINUTES.toMillis(15));
assertTriggerFlushByChanges(10);
assertTriggerFlushByChanges(10);
}
@Test
public void testPeriodicalFlush() throws InterruptedException {
conf.setLong(RegionFlusherAndCompactor.FLUSH_INTERVAL_MS_KEY, 1000);
initFlusher();
initFlusher(128 * 1024 * 1024, 1_000_000, TimeUnit.SECONDS.toMillis(1));
assertEquals(0, flushCalled.get());
Thread.sleep(1500);
assertEquals(1, flushCalled.get());

View File

@ -0,0 +1,216 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.store;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
@Category({ MasterTests.class, MediumTests.class })
public class TestLocalRegionOnTwoFileSystems {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestLocalRegionOnTwoFileSystems.class);
private static final HBaseCommonTestingUtility HFILE_UTIL = new HBaseCommonTestingUtility();
private static final HBaseTestingUtility WAL_UTIL = new HBaseTestingUtility();
private static byte[] CF = Bytes.toBytes("f");
private static byte[] CQ = Bytes.toBytes("q");
private static String REGION_DIR_NAME = "local";
private static TableDescriptor TD =
TableDescriptorBuilder.newBuilder(TableName.valueOf("test:local"))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build();
private static int COMPACT_MIN = 4;
private LocalRegion region;
@BeforeClass
public static void setUp() throws Exception {
WAL_UTIL.startMiniCluster(3);
Configuration conf = HFILE_UTIL.getConfiguration();
conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
Path rootDir = HFILE_UTIL.getDataTestDir();
CommonFSUtils.setRootDir(conf, rootDir);
Path walRootDir = WAL_UTIL.getDataTestDirOnTestFS();
FileSystem walFs = WAL_UTIL.getTestFileSystem();
CommonFSUtils.setWALRootDir(conf,
walRootDir.makeQualified(walFs.getUri(), walFs.getWorkingDirectory()));
}
@AfterClass
public static void tearDown() throws IOException {
WAL_UTIL.shutdownMiniDFSCluster();
WAL_UTIL.cleanupTestDir();
HFILE_UTIL.cleanupTestDir();
}
private LocalRegion createLocalRegion(ServerName serverName) throws IOException {
Server server = mock(Server.class);
when(server.getConfiguration()).thenReturn(HFILE_UTIL.getConfiguration());
when(server.getServerName()).thenReturn(serverName);
LocalRegionParams params = new LocalRegionParams();
params.server(server).regionDirName(REGION_DIR_NAME).tableDescriptor(TD)
.flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000)
.flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(COMPACT_MIN).maxWals(32)
.useHsync(false).ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15))
.archivedWalSuffix(LocalStore.ARCHIVED_WAL_SUFFIX)
.archivedHFileSuffix(LocalStore.ARCHIVED_HFILE_SUFFIX);
return LocalRegion.create(params);
}
@Before
public void setUpBeforeTest() throws IOException {
Path rootDir = HFILE_UTIL.getDataTestDir();
FileSystem fs = rootDir.getFileSystem(HFILE_UTIL.getConfiguration());
fs.delete(rootDir, true);
Path walRootDir = WAL_UTIL.getDataTestDirOnTestFS();
FileSystem walFs = WAL_UTIL.getTestFileSystem();
walFs.delete(walRootDir, true);
region = createLocalRegion(ServerName.valueOf("localhost", 12345, System.currentTimeMillis()));
}
@After
public void tearDownAfterTest() {
region.close(true);
}
private int getStorefilesCount() {
return Iterables.getOnlyElement(region.region.getStores()).getStorefilesCount();
}
@Test
public void testFlushAndCompact() throws Exception {
for (int i = 0; i < COMPACT_MIN - 1; i++) {
final int index = i;
region
.update(r -> r.put(new Put(Bytes.toBytes(index)).addColumn(CF, CQ, Bytes.toBytes(index))));
region.flush(true);
}
region.requestRollAll();
region.waitUntilWalRollFinished();
region.update(r -> r.put(
new Put(Bytes.toBytes(COMPACT_MIN - 1)).addColumn(CF, CQ, Bytes.toBytes(COMPACT_MIN - 1))));
region.flusherAndCompactor.requestFlush();
HFILE_UTIL.waitFor(15000, () -> getStorefilesCount() == 1);
// make sure the archived hfiles are on the root fs
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(
HFILE_UTIL.getDataTestDir(), region.region.getRegionInfo(), CF);
FileSystem rootFs = storeArchiveDir.getFileSystem(HFILE_UTIL.getConfiguration());
HFILE_UTIL.waitFor(15000, () -> {
try {
FileStatus[] fses = rootFs.listStatus(storeArchiveDir);
return fses != null && fses.length == COMPACT_MIN;
} catch (FileNotFoundException e) {
return false;
}
});
// make sure the archived wal files are on the wal fs
Path walArchiveDir = new Path(CommonFSUtils.getWALRootDir(HFILE_UTIL.getConfiguration()),
HConstants.HREGION_OLDLOGDIR_NAME);
HFILE_UTIL.waitFor(15000, () -> {
try {
FileStatus[] fses = WAL_UTIL.getTestFileSystem().listStatus(walArchiveDir);
return fses != null && fses.length == 1;
} catch (FileNotFoundException e) {
return false;
}
});
}
@Test
public void testRecovery() throws IOException {
int countPerRound = 100;
for (int round = 0; round < 5; round++) {
for (int i = 0; i < countPerRound; i++) {
int row = round * countPerRound + i;
Put put = new Put(Bytes.toBytes(row)).addColumn(CF, CQ, Bytes.toBytes(row));
region.update(r -> r.put(put));
}
region.close(true);
region = createLocalRegion(
ServerName.valueOf("localhost", 12345, System.currentTimeMillis() + round + 1));
try (RegionScanner scanner = region.getScanner(new Scan())) {
List<Cell> cells = new ArrayList<>();
boolean moreValues = true;
for (int i = 0; i < (round + 1) * countPerRound; i++) {
assertTrue(moreValues);
moreValues = scanner.next(cells);
assertEquals(1, cells.size());
Result result = Result.create(cells);
cells.clear();
assertEquals(i, Bytes.toInt(result.getRow()));
assertEquals(i, Bytes.toInt(result.getValue(CF, CQ)));
}
assertFalse(moreValues);
}
}
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.region;
package org.apache.hadoop.hbase.master.store;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -26,62 +26,39 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveProcedureWALCleaner;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveMasterLocalStoreWALCleaner;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.junit.After;
import org.junit.Before;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, MediumTests.class })
public class TestRegionProcedureStoreWALCleaner {
public class TestLocalRegionWALCleaner extends LocalRegionTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionProcedureStoreWALCleaner.class);
HBaseClassTestRule.forClass(TestLocalRegionWALCleaner.class);
private HBaseCommonTestingUtility htu;
private FileSystem fs;
private RegionProcedureStore store;
private ChoreService choreService;
private DirScanPool dirScanPool;
private static long TTL_MS = 5000;
private LogCleaner logCleaner;
private Path globalWALArchiveDir;
@Before
public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility();
@Override
protected void postSetUp() throws IOException {
Configuration conf = htu.getConfiguration();
conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
// Runs on local filesystem. Test does not need sync. Turn off checks.
htu.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
conf.setLong(TimeToLiveMasterLocalStoreWALCleaner.TTL_CONF_KEY, TTL_MS);
Path testDir = htu.getDataTestDir();
fs = testDir.getFileSystem(conf);
CommonFSUtils.setWALRootDir(conf, testDir);
FileSystem fs = testDir.getFileSystem(conf);
globalWALArchiveDir = new Path(testDir, HConstants.HREGION_OLDLOGDIR_NAME);
choreService = new ChoreService("Region-Procedure-Store");
dirScanPool = new DirScanPool(conf);
conf.setLong(TimeToLiveProcedureWALCleaner.TTL_CONF_KEY, 5000);
conf.setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 1000);
logCleaner = new LogCleaner(1000, new Stoppable() {
private volatile boolean stopped = false;
@ -95,30 +72,21 @@ public class TestRegionProcedureStoreWALCleaner {
public boolean isStopped() {
return stopped;
}
}, conf, fs, globalWALArchiveDir, dirScanPool);
}, conf, fs, globalWALArchiveDir, cleanerPool);
choreService.scheduleChore(logCleaner);
store = RegionProcedureStoreTestHelper.createStore(conf, choreService, dirScanPool,
new LoadCounter());
}
@After
public void tearDown() throws IOException {
store.stop(true);
logCleaner.cancel();
dirScanPool.shutdownNow();
choreService.shutdown();
htu.cleanupTestDir();
}
@Test
public void test() throws IOException, InterruptedException {
RegionProcedureStoreTestProcedure proc = new RegionProcedureStoreTestProcedure();
store.insert(proc, null);
store.region.flush(true);
region
.update(r -> r.put(new Put(Bytes.toBytes(1)).addColumn(CF1, QUALIFIER, Bytes.toBytes(1))));
region.flush(true);
Path testDir = htu.getDataTestDir();
FileSystem fs = testDir.getFileSystem(htu.getConfiguration());
// no archived wal files yet
assertFalse(fs.exists(globalWALArchiveDir));
store.walRoller.requestRollAll();
store.walRoller.waitUntilWalRollFinished();
region.requestRollAll();
region.waitUntilWalRollFinished();
// should have one
FileStatus[] files = fs.listStatus(globalWALArchiveDir);
assertEquals(1, files.length);

View File

@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStorePerformanceEvaluation;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
@ -47,19 +47,15 @@ public class RegionProcedureStorePerformanceEvaluation
private final ServerName serverName =
ServerName.valueOf("localhost", 12345, System.currentTimeMillis());
private final ChoreService choreService;
private volatile boolean abort = false;
public MockServer(Configuration conf) {
this.conf = conf;
this.choreService = new ChoreService("Cleaner-Chore-Service");
}
@Override
public void abort(String why, Throwable e) {
abort = true;
choreService.shutdown();
}
@Override
@ -69,7 +65,6 @@ public class RegionProcedureStorePerformanceEvaluation
@Override
public void stop(String why) {
choreService.shutdown();
}
@Override
@ -114,11 +109,11 @@ public class RegionProcedureStorePerformanceEvaluation
@Override
public ChoreService getChoreService() {
return choreService;
throw new UnsupportedOperationException();
}
}
private DirScanPool cleanerPool;
private LocalStore localStore;
@Override
protected RegionProcedureStore createProcedureStore(Path storeDir) throws IOException {
@ -132,10 +127,11 @@ public class RegionProcedureStorePerformanceEvaluation
int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
initialCountPercentage, null);
conf.setBoolean(RegionProcedureStore.USE_HSYNC_KEY, "hsync".equals(syncType));
conf.setBoolean(LocalStore.USE_HSYNC_KEY, "hsync".equals(syncType));
CommonFSUtils.setRootDir(conf, storeDir);
cleanerPool = new DirScanPool(conf);
return new RegionProcedureStore(new MockServer(conf), cleanerPool, (fs, apth) -> {
MockServer server = new MockServer(conf);
localStore = LocalStore.create(server);
return new RegionProcedureStore(server, localStore, (fs, apth) -> {
});
}
@ -152,7 +148,7 @@ public class RegionProcedureStorePerformanceEvaluation
@Override
protected void postStop(RegionProcedureStore store) throws IOException {
cleanerPool.shutdownNow();
localStore.close(true);
}
public static void main(String[] args) throws IOException {

View File

@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.procedure2.store.region;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.util.CommonFSUtils;
@ -30,42 +30,35 @@ import org.junit.After;
import org.junit.Before;
/**
* This runs on local filesystem. hsync and hflush are not supported. May lose data!
* Only use where data loss is not of consequence.
* This runs on local filesystem. hsync and hflush are not supported. May lose data! Only use where
* data loss is not of consequence.
*/
public class RegionProcedureStoreTestBase {
protected HBaseCommonTestingUtility htu;
protected LocalStore localStore;
protected RegionProcedureStore store;
protected ChoreService choreService;
protected DirScanPool cleanerPool;
protected void configure(Configuration conf) {
}
@Before
public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility();
htu.getConfiguration().setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
Configuration conf = htu.getConfiguration();
conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
// Runs on local filesystem. Test does not need sync. Turn off checks.
htu.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
configure(htu.getConfiguration());
conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
Path testDir = htu.getDataTestDir();
CommonFSUtils.setWALRootDir(htu.getConfiguration(), testDir);
choreService = new ChoreService(getClass().getSimpleName());
cleanerPool = new DirScanPool(htu.getConfiguration());
store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), choreService,
cleanerPool, new LoadCounter());
CommonFSUtils.setRootDir(htu.getConfiguration(), testDir);
Server server = RegionProcedureStoreTestHelper.mockServer(conf);
localStore = LocalStore.create(server);
store = RegionProcedureStoreTestHelper.createStore(server, localStore, new LoadCounter());
}
@After
public void tearDown() throws IOException {
store.stop(true);
cleanerPool.shutdownNow();
choreService.shutdown();
localStore.close(true);
htu.cleanupTestDir();
}
}

View File

@ -24,10 +24,9 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
@ -36,14 +35,17 @@ final class RegionProcedureStoreTestHelper {
private RegionProcedureStoreTestHelper() {
}
static RegionProcedureStore createStore(Configuration conf, ChoreService choreService,
DirScanPool cleanerPool, ProcedureLoader loader) throws IOException {
static Server mockServer(Configuration conf) {
Server server = mock(Server.class);
when(server.getConfiguration()).thenReturn(conf);
when(server.getServerName())
.thenReturn(ServerName.valueOf("localhost", 12345, System.currentTimeMillis()));
when(server.getChoreService()).thenReturn(choreService);
RegionProcedureStore store = new RegionProcedureStore(server, cleanerPool, new LeaseRecovery() {
return server;
}
static RegionProcedureStore createStore(Server server, LocalStore localStore,
ProcedureLoader loader) throws IOException {
RegionProcedureStore store = new RegionProcedureStore(server, localStore, new LeaseRecovery() {
@Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException {

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -93,16 +94,15 @@ public class TestHFileProcedurePrettyPrinter extends RegionProcedureStoreTestBas
store.insert(proc, null);
procs.add(proc);
}
store.region.flush(true);
store.localStore.flush(true);
for (int i = 0; i < 5; i++) {
store.delete(procs.get(i).getProcId());
}
store.region.flush(true);
store.localStore.flush(true);
store.cleanup();
store.region.flush(true);
store.localStore.flush(true);
Path tableDir = CommonFSUtils.getTableDir(
new Path(htu.getDataTestDir(), RegionProcedureStore.MASTER_PROCEDURE_DIR),
RegionProcedureStore.TABLE_NAME);
new Path(htu.getDataTestDir(), LocalStore.MASTER_STORE_DIR), LocalStore.TABLE_NAME);
FileSystem fs = tableDir.getFileSystem(htu.getConfiguration());
Path regionDir =
fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]

View File

@ -126,24 +126,24 @@ public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
assertEquals(1, loader.getRunnableCount());
// the row should still be there
assertTrue(store.region
assertTrue(store.localStore
.get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
assertTrue(store.region
assertTrue(store.localStore
.get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
// proc2 will be deleted after cleanup, but proc3 should still be there as it holds the max proc
// id
store.cleanup();
assertTrue(store.region
assertTrue(store.localStore
.get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
assertFalse(store.region
assertFalse(store.localStore
.get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
store.insert(proc4, null);
store.cleanup();
// proc3 should also be deleted as now proc4 holds the max proc id
assertFalse(store.region
assertFalse(store.localStore
.get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
}
@ -227,7 +227,7 @@ public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
@Override
public void setResponse(Message param, CellScanner cells, Throwable errorThrowable,
String error) {
String error) {
}
@Override

View File

@ -1,102 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.region;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
@Category({ MasterTests.class, MediumTests.class })
public class TestRegionProcedureStoreCompaction extends RegionProcedureStoreTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionProcedureStoreCompaction.class);
private int compactMin = 4;
@Override
protected void configure(Configuration conf) {
conf.setInt(RegionFlusherAndCompactor.COMPACT_MIN_KEY, compactMin);
conf.setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 500);
conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 5000);
}
private int getStorefilesCount() {
return Iterables.getOnlyElement(store.region.getStores()).getStorefilesCount();
}
@Test
public void test() throws IOException, InterruptedException {
for (int i = 0; i < compactMin - 1; i++) {
store.insert(new RegionProcedureStoreTestProcedure(), null);
store.region.flush(true);
}
assertEquals(compactMin - 1, getStorefilesCount());
store.insert(new RegionProcedureStoreTestProcedure(), null);
store.flusherAndCompactor.requestFlush();
htu.waitFor(15000, () -> getStorefilesCount() == 1);
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(
new Path(htu.getDataTestDir(), RegionProcedureStore.MASTER_PROCEDURE_DIR),
store.region.getRegionInfo(), RegionProcedureStore.FAMILY);
FileSystem fs = storeArchiveDir.getFileSystem(htu.getConfiguration());
// after compaction, the old hfiles should have been compacted
htu.waitFor(15000, () -> {
try {
FileStatus[] fses = fs.listStatus(storeArchiveDir);
return fses != null && fses.length == compactMin;
} catch (FileNotFoundException e) {
return false;
}
});
// ttl has not expired, so should not delete any files
Thread.sleep(1000);
FileStatus[] compactedHFiles = fs.listStatus(storeArchiveDir);
assertEquals(4, compactedHFiles.length);
Thread.sleep(2000);
// touch one file
long currentTime = System.currentTimeMillis();
fs.setTimes(compactedHFiles[0].getPath(), currentTime, currentTime);
Thread.sleep(3000);
// only the touched file is still there after clean up
FileStatus[] remainingHFiles = fs.listStatus(storeArchiveDir);
assertEquals(1, remainingHFiles.length);
assertEquals(compactedHFiles[0].getPath(), remainingHFiles[0].getPath());
Thread.sleep(6000);
// the touched file should also be cleaned up and then the cleaner will delete the parent
// directory since it is empty.
assertFalse(fs.exists(storeArchiveDir));
}
}

View File

@ -32,14 +32,14 @@ import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
@ -65,14 +65,14 @@ public class TestRegionProcedureStoreMigration {
private HBaseCommonTestingUtility htu;
private Server server;
private LocalStore localStore;
private RegionProcedureStore store;
private WALProcedureStore walStore;
private ChoreService choreService;
private DirScanPool cleanerPool;
@Before
public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility();
@ -81,7 +81,7 @@ public class TestRegionProcedureStoreMigration {
// Runs on local filesystem. Test does not need sync. Turn off checks.
htu.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
Path testDir = htu.getDataTestDir();
CommonFSUtils.setWALRootDir(conf, testDir);
CommonFSUtils.setRootDir(conf, testDir);
walStore = new WALProcedureStore(conf, new LeaseRecovery() {
@Override
@ -91,8 +91,8 @@ public class TestRegionProcedureStoreMigration {
walStore.start(1);
walStore.recoverLease();
walStore.load(new LoadCounter());
choreService = new ChoreService(getClass().getSimpleName());
cleanerPool = new DirScanPool(htu.getConfiguration());
server = RegionProcedureStoreTestHelper.mockServer(conf);
localStore = LocalStore.create(server);
}
@After
@ -100,9 +100,8 @@ public class TestRegionProcedureStoreMigration {
if (store != null) {
store.stop(true);
}
localStore.close(true);
walStore.stop(true);
cleanerPool.shutdownNow();
choreService.shutdown();
htu.cleanupTestDir();
}
@ -121,30 +120,29 @@ public class TestRegionProcedureStoreMigration {
SortedSet<RegionProcedureStoreTestProcedure> loadedProcs =
new TreeSet<>((p1, p2) -> Long.compare(p1.getProcId(), p2.getProcId()));
MutableLong maxProcIdSet = new MutableLong(0);
store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), choreService,
cleanerPool, new ProcedureLoader() {
store = RegionProcedureStoreTestHelper.createStore(server, localStore, new ProcedureLoader() {
@Override
public void setMaxProcId(long maxProcId) {
maxProcIdSet.setValue(maxProcId);
}
@Override
public void setMaxProcId(long maxProcId) {
maxProcIdSet.setValue(maxProcId);
}
@Override
public void load(ProcedureIterator procIter) throws IOException {
while (procIter.hasNext()) {
RegionProcedureStoreTestProcedure proc =
(RegionProcedureStoreTestProcedure) procIter.next();
loadedProcs.add(proc);
}
@Override
public void load(ProcedureIterator procIter) throws IOException {
while (procIter.hasNext()) {
RegionProcedureStoreTestProcedure proc =
(RegionProcedureStoreTestProcedure) procIter.next();
loadedProcs.add(proc);
}
}
@Override
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
if (procIter.hasNext()) {
fail("Found corrupted procedures");
}
@Override
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
if (procIter.hasNext()) {
fail("Found corrupted procedures");
}
});
}
});
assertEquals(10, maxProcIdSet.longValue());
assertEquals(5, loadedProcs.size());
int procId = 1;
@ -168,8 +166,7 @@ public class TestRegionProcedureStoreMigration {
walStore.stop(true);
try {
store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), choreService,
cleanerPool, new LoadCounter());
store = RegionProcedureStoreTestHelper.createStore(server, localStore, new LoadCounter());
fail("Should fail since AssignProcedure is not supported");
} catch (HBaseIOException e) {
assertThat(e.getMessage(), startsWith("Unsupported"));

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.util.ToolRunner;
@ -57,18 +58,18 @@ public class TestWALProcedurePrettyPrinter extends RegionProcedureStoreTestBase
store.insert(proc, null);
procs.add(proc);
}
store.region.flush(true);
store.localStore.flush(true);
for (int i = 0; i < 5; i++) {
store.delete(procs.get(i).getProcId());
}
store.cleanup();
Path walParentDir = new Path(htu.getDataTestDir(),
RegionProcedureStore.MASTER_PROCEDURE_DIR + "/" + HConstants.HREGION_LOGDIR_NAME);
LocalStore.MASTER_STORE_DIR + "/" + HConstants.HREGION_LOGDIR_NAME);
FileSystem fs = walParentDir.getFileSystem(htu.getConfiguration());
Path walDir = fs.listStatus(walParentDir)[0].getPath();
Path walFile = fs.listStatus(walDir)[0].getPath();
store.walRoller.requestRollAll();
store.walRoller.waitUntilWalRollFinished();
store.localStore.requestRollAll();
store.localStore.waitUntilWalRollFinished();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
PrintStream out = new PrintStream(bos);
WALProcedurePrettyPrinter printer = new WALProcedurePrettyPrinter(out);