@Override
protected boolean validate(Path file) {
- return AbstractFSWALProvider.validateWALFilename(file.getName())
- || MasterProcedureUtil.validateProcedureWALFilename(file.getName());
+ return AbstractFSWALProvider.validateWALFilename(file.getName()) ||
+ MasterProcedureUtil.validateProcedureWALFilename(file.getName()) ||
+ file.getName().endsWith(LocalStore.ARCHIVED_WAL_SUFFIX);
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveHFileCleaner.java
index e789752d1ab..4424eb85b3f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveHFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveHFileCleaner.java
@@ -17,48 +17,33 @@
*/
package org.apache.hadoop.hbase.master.cleaner;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* HFile cleaner that uses the timestamp of the hfile to determine if it should be deleted. By
* default they are allowed to live for {@value #DEFAULT_TTL}
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class TimeToLiveHFileCleaner extends BaseHFileCleanerDelegate {
+public class TimeToLiveHFileCleaner extends BaseTimeToLiveFileCleaner {
- private static final Logger LOG = LoggerFactory.getLogger(TimeToLiveHFileCleaner.class.getName());
public static final String TTL_CONF_KEY = "hbase.master.hfilecleaner.ttl";
+
// default ttl = 5 minutes
public static final long DEFAULT_TTL = 60000 * 5;
- // Configured time a hfile can be kept after it was moved to the archive
- private long ttl;
@Override
- public void setConf(Configuration conf) {
- this.ttl = conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
- super.setConf(conf);
+ protected long getTtlMs(Configuration conf) {
+ return conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
}
@Override
- public boolean isFileDeletable(FileStatus fStat) {
- long currentTime = EnvironmentEdgeManager.currentTime();
- long time = fStat.getModificationTime();
- long life = currentTime - time;
- if (LOG.isTraceEnabled()) {
- LOG.trace("HFile life:" + life + ", ttl:" + ttl + ", current:" + currentTime + ", from: "
- + time);
- }
- if (life < 0) {
- LOG.warn("Found a hfile (" + fStat.getPath() + ") newer than current time (" + currentTime
- + " < " + time + "), probably a clock skew");
- return false;
- }
- return life > ttl;
+ protected boolean valiateFilename(Path file) {
+ return HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent()) ||
+ StoreFileInfo.validateStoreFileName(file.getName());
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveLogCleaner.java
index 670bd8819f5..75fe85731b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveLogCleaner.java
@@ -17,66 +17,31 @@
*/
package org.apache.hadoop.hbase.master.cleaner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
- * Log cleaner that uses the timestamp of the wal to determine if it should
- * be deleted. By default they are allowed to live for 10 minutes.
+ * Log cleaner that uses the timestamp of the wal to determine if it should be deleted. By default
+ * they are allowed to live for 10 minutes.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class TimeToLiveLogCleaner extends BaseLogCleanerDelegate {
- private static final Logger LOG = LoggerFactory.getLogger(TimeToLiveLogCleaner.class.getName());
+public class TimeToLiveLogCleaner extends BaseTimeToLiveFileCleaner {
+
public static final String TTL_CONF_KEY = "hbase.master.logcleaner.ttl";
+
// default ttl = 10 minutes
public static final long DEFAULT_TTL = 600_000L;
- // Configured time a log can be kept after it was closed
- private long ttl;
- private boolean stopped = false;
@Override
- public boolean isFileDeletable(FileStatus fStat) {
- // Files are validated for the second time here,
- // if it causes a bottleneck this logic needs refactored
- if (!AbstractFSWALProvider.validateWALFilename(fStat.getPath().getName())) {
- return true;
- }
-
- long currentTime = EnvironmentEdgeManager.currentTime();
- long time = fStat.getModificationTime();
- long life = currentTime - time;
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Log life:" + life + ", ttl:" + ttl + ", current:" + currentTime + ", from: "
- + time);
- }
- if (life < 0) {
- LOG.warn("Found a log (" + fStat.getPath() + ") newer than current time (" + currentTime
- + " < " + time + "), probably a clock skew");
- return false;
- }
- return life > ttl;
+ protected long getTtlMs(Configuration conf) {
+ return conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
}
@Override
- public void setConf(Configuration conf) {
- super.setConf(conf);
- this.ttl = conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
- }
-
- @Override
- public void stop(String why) {
- this.stopped = true;
- }
-
- @Override
- public boolean isStopped() {
- return this.stopped;
+ protected boolean valiateFilename(Path file) {
+ return AbstractFSWALProvider.validateWALFilename(file.getName());
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveMasterLocalStoreHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveMasterLocalStoreHFileCleaner.java
new file mode 100644
index 00000000000..843361c6fd0
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveMasterLocalStoreHFileCleaner.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.master.store.LocalStore;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Master local storage HFile cleaner that uses the timestamp of the HFile to determine if it should
+ * be deleted. By default they are allowed to live for {@value #DEFAULT_TTL}
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class TimeToLiveMasterLocalStoreHFileCleaner extends BaseTimeToLiveFileCleaner {
+
+ public static final String TTL_CONF_KEY = "hbase.master.local.store.hfilecleaner.ttl";
+
+ // default ttl = 7 days
+ public static final long DEFAULT_TTL = 604_800_000L;
+
+ @Override
+ protected long getTtlMs(Configuration conf) {
+ return conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
+ }
+
+ @Override
+ protected boolean valiateFilename(Path file) {
+ return file.getName().endsWith(LocalStore.ARCHIVED_HFILE_SUFFIX);
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveMasterLocalStoreWALCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveMasterLocalStoreWALCleaner.java
new file mode 100644
index 00000000000..e7f61475e89
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveMasterLocalStoreWALCleaner.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.master.store.LocalStore;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Master local storage WAL cleaner that uses the timestamp of the WAL file to determine if it
+ * should be deleted. By default they are allowed to live for {@value #DEFAULT_TTL}
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class TimeToLiveMasterLocalStoreWALCleaner extends BaseTimeToLiveFileCleaner {
+
+ public static final String TTL_CONF_KEY = "hbase.master.local.store.walcleaner.ttl";
+
+ // default ttl = 7 days
+ public static final long DEFAULT_TTL = 604_800_000L;
+
+ @Override
+ protected long getTtlMs(Configuration conf) {
+ return conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
+ }
+
+ @Override
+ protected boolean valiateFilename(Path file) {
+ return file.getName().endsWith(LocalStore.ARCHIVED_WAL_SUFFIX);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveProcedureWALCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveProcedureWALCleaner.java
index 5535a4b03b0..ece9f776c46 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveProcedureWALCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveProcedureWALCleaner.java
@@ -18,65 +18,33 @@
package org.apache.hadoop.hbase.master.cleaner;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Procedure WAL cleaner that uses the timestamp of the Procedure WAL to determine if it should be
- * deleted. By default they are allowed to live for {@value #DEFAULT_TTL}
+ * deleted. By default they are allowed to live for {@value #DEFAULT_TTL}.
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. We will not use the procedure wal to store
+ * procedure any more.
*/
+@Deprecated
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class TimeToLiveProcedureWALCleaner extends BaseLogCleanerDelegate {
- private static final Logger LOG =
- LoggerFactory.getLogger(TimeToLiveProcedureWALCleaner.class.getName());
+public class TimeToLiveProcedureWALCleaner extends BaseTimeToLiveFileCleaner {
+
public static final String TTL_CONF_KEY = "hbase.master.procedurewalcleaner.ttl";
+
// default ttl = 7 days
public static final long DEFAULT_TTL = 604_800_000L;
- // Configured time a procedure log can be kept after it was moved to the archive
- private long ttl;
- private boolean stopped = false;
@Override
- public void setConf(Configuration conf) {
- this.ttl = conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
- super.setConf(conf);
+ protected long getTtlMs(Configuration conf) {
+ return conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
}
@Override
- public boolean isFileDeletable(FileStatus fStat) {
- // Files are validated for the second time here,
- // if it causes a bottleneck this logic needs refactored
- if (!MasterProcedureUtil.validateProcedureWALFilename(fStat.getPath().getName())) {
- return true;
- }
-
- long currentTime = EnvironmentEdgeManager.currentTime();
- long time = fStat.getModificationTime();
- long life = currentTime - time;
- if (LOG.isTraceEnabled()) {
- LOG.trace("Procedure log life:" + life + ", ttl:" + ttl + ", current:" + currentTime +
- ", from: " + time);
- }
- if (life < 0) {
- LOG.warn("Found a procedure log (" + fStat.getPath() + ") newer than current time ("
- + currentTime + " < " + time + "), probably a clock skew");
- return false;
- }
- return life > ttl;
- }
-
- @Override
- public void stop(String why) {
- this.stopped = true;
- }
-
- @Override
- public boolean isStopped() {
- return this.stopped;
+ protected boolean valiateFilename(Path file) {
+ return MasterProcedureUtil.validateProcedureWALFilename(file.getName());
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
index 49bf5c85322..23d6ecb284a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
@@ -152,16 +152,13 @@ public final class MasterProcedureUtil {
@Deprecated
private static final Pattern PATTERN = Pattern.compile(".*pv2-\\d{20}.log");
- // Use the character $ to let the log cleaner know that this is not the normal wal file.
- public static final String ARCHIVED_PROC_WAL_SUFFIX = "$masterproc$";
-
/**
* A Procedure WAL file name is of the format: pv-<wal-id>.log where wal-id is 20 digits.
* @param filename name of the file to validate
* @return true if the filename matches a Procedure WAL, false otherwise
*/
public static boolean validateProcedureWALFilename(String filename) {
- return PATTERN.matcher(filename).matches() || filename.endsWith(ARCHIVED_PROC_WAL_SUFFIX);
+ return PATTERN.matcher(filename).matches();
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegion.java
new file mode 100644
index 00000000000..506de309ff9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegion.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.store;
+
+import static org.apache.hadoop.hbase.HConstants.HREGION_LOGDIR_NAME;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.math.IntMath;
+
+/**
+ * A region that stores data in a separated directory.
+ *
+ * FileSystem layout:
+ *
+ *
+ * hbase
+ * |
+ * --<region dir>
+ * |
+ * --data
+ * | |
+ * | --/<ns>/<table>/<encoded-region-name> <---- The region data
+ * | |
+ * | --replay <---- The edits to replay
+ * |
+ * --WALs
+ * |
+ * --<master-server-name> <---- The WAL dir for active master
+ * |
+ * --<master-server-name>-dead <---- The WAL dir for dead master
+ *
+ *
+ * Notice that, you can use different root file system and WAL file system. Then the above directory
+ * will be on two file systems, the root file system will have the data directory while the WAL
+ * filesystem will have the WALs directory. The archived HFile will be moved to the global HFile
+ * archived directory with the {@link LocalRegionParams#archivedWalSuffix()} suffix. The archived
+ * WAL will be moved to the global WAL archived directory with the
+ * {@link LocalRegionParams#archivedHFileSuffix()} suffix.
+ */
+@InterfaceAudience.Private
+public final class LocalRegion {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LocalRegion.class);
+
+ private static final String REPLAY_EDITS_DIR = "recovered.wals";
+
+ private static final String DEAD_WAL_DIR_SUFFIX = "-dead";
+
+ private static final int REGION_ID = 1;
+
+ private final WALFactory walFactory;
+
+ @VisibleForTesting
+ final HRegion region;
+
+ @VisibleForTesting
+ final LocalRegionFlusherAndCompactor flusherAndCompactor;
+
+ private LocalRegionWALRoller walRoller;
+
+ private LocalRegion(HRegion region, WALFactory walFactory,
+ LocalRegionFlusherAndCompactor flusherAndCompactor, LocalRegionWALRoller walRoller) {
+ this.region = region;
+ this.walFactory = walFactory;
+ this.flusherAndCompactor = flusherAndCompactor;
+ this.walRoller = walRoller;
+ }
+
+ private void closeRegion(boolean abort) {
+ try {
+ region.close(abort);
+ } catch (IOException e) {
+ LOG.warn("Failed to close region", e);
+ }
+ }
+
+ private void shutdownWAL() {
+ try {
+ walFactory.shutdown();
+ } catch (IOException e) {
+ LOG.warn("Failed to shutdown WAL", e);
+ }
+ }
+
+ public void update(UpdateLocalRegion action) throws IOException {
+ action.update(region);
+ flusherAndCompactor.onUpdate();
+ }
+
+ public Result get(Get get) throws IOException {
+ return region.get(get);
+ }
+
+ public RegionScanner getScanner(Scan scan) throws IOException {
+ return region.getScanner(scan);
+ }
+
+ @VisibleForTesting
+ FlushResult flush(boolean force) throws IOException {
+ return region.flush(force);
+ }
+
+ @VisibleForTesting
+ void requestRollAll() {
+ walRoller.requestRollAll();
+ }
+
+ @VisibleForTesting
+ void waitUntilWalRollFinished() throws InterruptedException {
+ walRoller.waitUntilWalRollFinished();
+ }
+
+ public void close(boolean abort) {
+ LOG.info("Closing local region {}, isAbort={}", region.getRegionInfo(), abort);
+ if (flusherAndCompactor != null) {
+ flusherAndCompactor.close();
+ }
+ // if abort, we shutdown wal first to fail the ongoing updates to the region, and then close the
+ // region, otherwise there will be dead lock.
+ if (abort) {
+ shutdownWAL();
+ closeRegion(true);
+ } else {
+ closeRegion(false);
+ shutdownWAL();
+ }
+
+ if (walRoller != null) {
+ walRoller.close();
+ }
+ }
+
+ private static WAL createWAL(WALFactory walFactory, LocalRegionWALRoller walRoller,
+ String serverName, FileSystem walFs, Path walRootDir, RegionInfo regionInfo)
+ throws IOException {
+ String logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
+ Path walDir = new Path(walRootDir, logName);
+ LOG.debug("WALDir={}", walDir);
+ if (walFs.exists(walDir)) {
+ throw new HBaseIOException(
+ "Already created wal directory at " + walDir + " for local region " + regionInfo);
+ }
+ if (!walFs.mkdirs(walDir)) {
+ throw new IOException(
+ "Can not create wal directory " + walDir + " for local region " + regionInfo);
+ }
+ WAL wal = walFactory.getWAL(regionInfo);
+ walRoller.addWAL(wal);
+ return wal;
+ }
+
+ private static HRegion bootstrap(Configuration conf, TableDescriptor td, FileSystem fs,
+ Path rootDir, FileSystem walFs, Path walRootDir, WALFactory walFactory,
+ LocalRegionWALRoller walRoller, String serverName) throws IOException {
+ TableName tn = td.getTableName();
+ RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setRegionId(REGION_ID).build();
+ Path tmpTableDir = CommonFSUtils.getTableDir(rootDir,
+ TableName.valueOf(tn.getNamespaceAsString(), tn.getQualifierAsString() + "-tmp"));
+ if (fs.exists(tmpTableDir) && !fs.delete(tmpTableDir, true)) {
+ throw new IOException("Can not delete partial created proc region " + tmpTableDir);
+ }
+ HRegion.createHRegion(conf, regionInfo, fs, tmpTableDir, td).close();
+ Path tableDir = CommonFSUtils.getTableDir(rootDir, tn);
+ if (!fs.rename(tmpTableDir, tableDir)) {
+ throw new IOException("Can not rename " + tmpTableDir + " to " + tableDir);
+ }
+ WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo);
+ return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
+ }
+
+ private static HRegion open(Configuration conf, TableDescriptor td, FileSystem fs, Path rootDir,
+ FileSystem walFs, Path walRootDir, WALFactory walFactory, LocalRegionWALRoller walRoller,
+ String serverName) throws IOException {
+ Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
+ Path regionDir =
+ fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
+ .getPath();
+ RegionInfo regionInfo = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
+
+ Path walRegionDir = FSUtils.getRegionDirFromRootDir(walRootDir, regionInfo);
+ Path replayEditsDir = new Path(walRegionDir, REPLAY_EDITS_DIR);
+ if (!walFs.exists(replayEditsDir) && !walFs.mkdirs(replayEditsDir)) {
+ throw new IOException("Failed to create replay directory: " + replayEditsDir);
+ }
+ Path walsDir = new Path(walRootDir, HREGION_LOGDIR_NAME);
+ for (FileStatus walDir : walFs.listStatus(walsDir)) {
+ if (!walDir.isDirectory()) {
+ continue;
+ }
+ if (walDir.getPath().getName().startsWith(serverName)) {
+ LOG.warn("This should not happen in real production as we have not created our WAL " +
+ "directory yet, ignore if you are running a local region related UT");
+ }
+ Path deadWALDir;
+ if (!walDir.getPath().getName().endsWith(DEAD_WAL_DIR_SUFFIX)) {
+ deadWALDir =
+ new Path(walDir.getPath().getParent(), walDir.getPath().getName() + DEAD_WAL_DIR_SUFFIX);
+ if (!walFs.rename(walDir.getPath(), deadWALDir)) {
+ throw new IOException("Can not rename " + walDir + " to " + deadWALDir +
+ " when recovering lease of proc store");
+ }
+ LOG.info("Renamed {} to {} as it is dead", walDir.getPath(), deadWALDir);
+ } else {
+ deadWALDir = walDir.getPath();
+ LOG.info("{} is already marked as dead", deadWALDir);
+ }
+ for (FileStatus walFile : walFs.listStatus(deadWALDir)) {
+ Path replayEditsFile = new Path(replayEditsDir, walFile.getPath().getName());
+ RecoverLeaseFSUtils.recoverFileLease(walFs, walFile.getPath(), conf);
+ if (!walFs.rename(walFile.getPath(), replayEditsFile)) {
+ throw new IOException("Can not rename " + walFile.getPath() + " to " + replayEditsFile +
+ " when recovering lease for local region");
+ }
+ LOG.info("Renamed {} to {}", walFile.getPath(), replayEditsFile);
+ }
+ LOG.info("Delete empty local region wal dir {}", deadWALDir);
+ walFs.delete(deadWALDir, true);
+ }
+
+ WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo);
+ conf.set(HRegion.SPECIAL_RECOVERED_EDITS_DIR,
+ replayEditsDir.makeQualified(walFs.getUri(), walFs.getWorkingDirectory()).toString());
+ return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
+ }
+
+ public static LocalRegion create(LocalRegionParams params) throws IOException {
+ TableDescriptor td = params.tableDescriptor();
+ LOG.info("Create or load local region for table " + td);
+ Server server = params.server();
+ Configuration baseConf = server.getConfiguration();
+ FileSystem fs = CommonFSUtils.getRootDirFileSystem(baseConf);
+ FileSystem walFs = CommonFSUtils.getWALFileSystem(baseConf);
+ Path globalRootDir = CommonFSUtils.getRootDir(baseConf);
+ Path globalWALRootDir = CommonFSUtils.getWALRootDir(baseConf);
+ Path rootDir = new Path(globalRootDir, params.regionDirName());
+ Path walRootDir = new Path(globalWALRootDir, params.regionDirName());
+ // we will override some configurations so create a new one.
+ Configuration conf = new Configuration(baseConf);
+ CommonFSUtils.setRootDir(conf, rootDir);
+ CommonFSUtils.setWALRootDir(conf, walRootDir);
+ LocalRegionFlusherAndCompactor.setupConf(conf, params.flushSize(), params.flushPerChanges(),
+ params.flushIntervalMs());
+ conf.setInt(AbstractFSWAL.MAX_LOGS, params.maxWals());
+ if (params.useHsync() != null) {
+ conf.setBoolean(HRegion.WAL_HSYNC_CONF_KEY, params.useHsync());
+ }
+ conf.setInt(AbstractFSWAL.RING_BUFFER_SLOT_COUNT,
+ IntMath.ceilingPowerOfTwo(params.ringBufferSlotCount()));
+
+ LocalRegionWALRoller walRoller = LocalRegionWALRoller.create(td.getTableName() + "-WAL-Roller",
+ conf, server, walFs, walRootDir, globalWALRootDir, params.archivedWalSuffix(),
+ params.rollPeriodMs(), params.flushSize());
+ walRoller.start();
+
+ WALFactory walFactory = new WALFactory(conf, server.getServerName().toString());
+ Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
+ HRegion region;
+ if (fs.exists(tableDir)) {
+ // load the existing region.
+ region = open(conf, td, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
+ server.getServerName().toString());
+ } else {
+ // bootstrapping...
+ region = bootstrap(conf, td, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
+ server.getServerName().toString());
+ }
+ Path globalArchiveDir = HFileArchiveUtil.getArchivePath(baseConf);
+ LocalRegionFlusherAndCompactor flusherAndCompactor = new LocalRegionFlusherAndCompactor(conf,
+ server, region, params.flushSize(), params.flushPerChanges(), params.flushIntervalMs(),
+ params.compactMin(), globalArchiveDir, params.archivedHFileSuffix());
+ walRoller.setFlusherAndCompactor(flusherAndCompactor);
+ Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
+ if (!fs.mkdirs(archiveDir)) {
+ LOG.warn("Failed to create archive directory {}. Usually this should not happen but it will" +
+ " be created again when we actually archive the hfiles later, so continue", archiveDir);
+ }
+ return new LocalRegion(region, walFactory, flusherAndCompactor, walRoller);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionFlusherAndCompactor.java
similarity index 71%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionFlusherAndCompactor.java
index 57e62ddf654..b177a59b6e2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionFlusherAndCompactor.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.procedure2.store.region;
+package org.apache.hadoop.hbase.master.store;
import java.io.Closeable;
import java.io.IOException;
@@ -27,22 +27,26 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
- * As long as there is no RegionServerServices for the procedure store region, we need implement the
- * flush and compaction logic by our own.
+ * As long as there is no RegionServerServices for a 'local' region, we need implement the flush and
+ * compaction logic by our own.
*
* The flush logic is very simple, every time after calling a modification method in
* {@link RegionProcedureStore}, we will call the {@link #onUpdate()} method below, and in this
@@ -53,26 +57,11 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
* count, if it is above the compactMin, we will do a major compaction.
*/
@InterfaceAudience.Private
-class RegionFlusherAndCompactor implements Closeable {
+class LocalRegionFlusherAndCompactor implements Closeable {
- private static final Logger LOG = LoggerFactory.getLogger(RegionFlusherAndCompactor.class);
+ private static final Logger LOG = LoggerFactory.getLogger(LocalRegionFlusherAndCompactor.class);
- static final String FLUSH_SIZE_KEY = "hbase.procedure.store.region.flush.size";
-
- static final long DEFAULT_FLUSH_SIZE = TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE;
-
- static final String FLUSH_PER_CHANGES_KEY = "hbase.procedure.store.region.flush.per.changes";
-
- private static final long DEFAULT_FLUSH_PER_CHANGES = 1_000_000;
-
- static final String FLUSH_INTERVAL_MS_KEY = "hbase.procedure.store.region.flush.interval.ms";
-
- // default to flush every 15 minutes, for safety
- private static final long DEFAULT_FLUSH_INTERVAL_MS = TimeUnit.MINUTES.toMillis(15);
-
- static final String COMPACT_MIN_KEY = "hbase.procedure.store.region.compact.min";
-
- private static final int DEFAULT_COMPACT_MIN = 4;
+ private final Configuration conf;
private final Abortable abortable;
@@ -90,6 +79,10 @@ class RegionFlusherAndCompactor implements Closeable {
private final int compactMin;
+ private final Path globalArchivePath;
+
+ private final String archivedHFileSuffix;
+
private final Thread flushThread;
private final Lock flushLock = new ReentrantLock();
@@ -108,38 +101,60 @@ class RegionFlusherAndCompactor implements Closeable {
private volatile boolean closed = false;
- RegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region) {
+ LocalRegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region,
+ long flushSize, long flushPerChanges, long flushIntervalMs, int compactMin,
+ Path globalArchivePath, String archivedHFileSuffix) {
+ this.conf = conf;
this.abortable = abortable;
this.region = region;
- flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
- flushPerChanges = conf.getLong(FLUSH_PER_CHANGES_KEY, DEFAULT_FLUSH_PER_CHANGES);
- flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
- compactMin = conf.getInt(COMPACT_MIN_KEY, DEFAULT_COMPACT_MIN);
- flushThread = new Thread(this::flushLoop, "Procedure-Region-Store-Flusher");
+ this.flushSize = flushSize;
+ this.flushPerChanges = flushPerChanges;
+ this.flushIntervalMs = flushIntervalMs;
+ this.compactMin = compactMin;
+ this.globalArchivePath = globalArchivePath;
+ this.archivedHFileSuffix = archivedHFileSuffix;
+ flushThread = new Thread(this::flushLoop, region.getRegionInfo().getTable() + "-Flusher");
flushThread.setDaemon(true);
flushThread.start();
compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setNameFormat("Procedure-Region-Store-Compactor").setDaemon(true).build());
+ .setNameFormat(region.getRegionInfo().getTable() + "-Store-Compactor").setDaemon(true)
+ .build());
LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, compactMin={}",
flushSize, flushPerChanges, flushIntervalMs, compactMin);
}
// inject our flush related configurations
- static void setupConf(Configuration conf) {
- long flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
+ static void setupConf(Configuration conf, long flushSize, long flushPerChanges,
+ long flushIntervalMs) {
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSize);
- long flushPerChanges = conf.getLong(FLUSH_PER_CHANGES_KEY, DEFAULT_FLUSH_PER_CHANGES);
conf.setLong(HRegion.MEMSTORE_FLUSH_PER_CHANGES, flushPerChanges);
- long flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, flushIntervalMs);
LOG.info("Injected flushSize={}, flushPerChanges={}, flushIntervalMs={}", flushSize,
flushPerChanges, flushIntervalMs);
}
+ private void moveHFileToGlobalArchiveDir() throws IOException {
+ FileSystem fs = region.getRegionFileSystem().getFileSystem();
+ for (HStore store : region.getStores()) {
+ store.closeAndArchiveCompactedFiles();
+ Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, region.getRegionInfo(),
+ store.getColumnFamilyDescriptor().getName());
+ Path globalStoreArchiveDir = HFileArchiveUtil.getStoreArchivePathForArchivePath(
+ globalArchivePath, region.getRegionInfo(), store.getColumnFamilyDescriptor().getName());
+ try {
+ LocalRegionUtils.moveFilesUnderDir(fs, storeArchiveDir, globalStoreArchiveDir,
+ archivedHFileSuffix);
+ } catch (IOException e) {
+ LOG.warn("Failed to move archived hfiles from {} to global dir {}", storeArchiveDir,
+ globalStoreArchiveDir, e);
+ }
+ }
+ }
+
private void compact() {
try {
region.compact(true);
- Iterables.getOnlyElement(region.getStores()).closeAndArchiveCompactedFiles();
+ moveHFileToGlobalArchiveDir();
} catch (IOException e) {
LOG.error("Failed to compact procedure store region", e);
}
@@ -156,7 +171,12 @@ class RegionFlusherAndCompactor implements Closeable {
}
private boolean needCompaction() {
- return Iterables.getOnlyElement(region.getStores()).getStorefilesCount() >= compactMin;
+ for (Store store : region.getStores()) {
+ if (store.getStorefilesCount() >= compactMin) {
+ return true;
+ }
+ }
+ return false;
}
private void flushLoop() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionParams.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionParams.java
new file mode 100644
index 00000000000..85bcaae4c86
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionParams.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.store;
+
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The parameters for constructing {@link LocalRegion}.
+ */
+@InterfaceAudience.Private
+public class LocalRegionParams {
+
+ private Server server;
+
+ private String regionDirName;
+
+ private TableDescriptor tableDescriptor;
+
+ private Long flushSize;
+
+ private Long flushPerChanges;
+
+ private Long flushIntervalMs;
+
+ private Integer compactMin;
+
+ private Integer maxWals;
+
+ private Boolean useHsync;
+
+ private Integer ringBufferSlotCount;
+
+ private Long rollPeriodMs;
+
+ private String archivedWalSuffix;
+
+ private String archivedHFileSuffix;
+
+ public LocalRegionParams server(Server server) {
+ this.server = server;
+ return this;
+ }
+
+ public LocalRegionParams regionDirName(String regionDirName) {
+ this.regionDirName = regionDirName;
+ return this;
+ }
+
+ public LocalRegionParams tableDescriptor(TableDescriptor tableDescriptor) {
+ this.tableDescriptor = tableDescriptor;
+ return this;
+ }
+
+ public LocalRegionParams flushSize(long flushSize) {
+ this.flushSize = flushSize;
+ return this;
+ }
+
+ public LocalRegionParams flushPerChanges(long flushPerChanges) {
+ this.flushPerChanges = flushPerChanges;
+ return this;
+ }
+
+ public LocalRegionParams flushIntervalMs(long flushIntervalMs) {
+ this.flushIntervalMs = flushIntervalMs;
+ return this;
+ }
+
+ public LocalRegionParams compactMin(int compactMin) {
+ this.compactMin = compactMin;
+ return this;
+ }
+
+ public LocalRegionParams maxWals(int maxWals) {
+ this.maxWals = maxWals;
+ return this;
+ }
+
+ public LocalRegionParams useHsync(boolean useHsync) {
+ this.useHsync = useHsync;
+ return this;
+ }
+
+ public LocalRegionParams ringBufferSlotCount(int ringBufferSlotCount) {
+ this.ringBufferSlotCount = ringBufferSlotCount;
+ return this;
+ }
+
+ public LocalRegionParams rollPeriodMs(long rollPeriodMs) {
+ this.rollPeriodMs = rollPeriodMs;
+ return this;
+ }
+
+ public LocalRegionParams archivedWalSuffix(String archivedWalSuffix) {
+ this.archivedWalSuffix = archivedWalSuffix;
+ return this;
+ }
+
+ public LocalRegionParams archivedHFileSuffix(String archivedHFileSuffix) {
+ this.archivedHFileSuffix = archivedHFileSuffix;
+ return this;
+ }
+
+ public Server server() {
+ return server;
+ }
+
+ public String regionDirName() {
+ return regionDirName;
+ }
+
+ public TableDescriptor tableDescriptor() {
+ return tableDescriptor;
+ }
+
+ public long flushSize() {
+ return flushSize;
+ }
+
+ public long flushPerChanges() {
+ return flushPerChanges;
+ }
+
+ public long flushIntervalMs() {
+ return flushIntervalMs;
+ }
+
+ public int compactMin() {
+ return compactMin;
+ }
+
+ public int maxWals() {
+ return maxWals;
+ }
+
+ public Boolean useHsync() {
+ return useHsync;
+ }
+
+ public int ringBufferSlotCount() {
+ return ringBufferSlotCount;
+ }
+
+ public long rollPeriodMs() {
+ return rollPeriodMs;
+ }
+
+ public String archivedWalSuffix() {
+ return archivedWalSuffix;
+ }
+
+ public String archivedHFileSuffix() {
+ return archivedHFileSuffix;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionUtils.java
new file mode 100644
index 00000000000..a538db6d4df
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionUtils.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.store;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+final class LocalRegionUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LocalRegionUtils.class);
+
+ private LocalRegionUtils() {
+ }
+
+ static void moveFilesUnderDir(FileSystem fs, Path src, Path dst, String suffix)
+ throws IOException {
+ if (!fs.exists(dst) && !fs.mkdirs(dst)) {
+ LOG.warn("Failed to create dir {}", dst);
+ return;
+ }
+ FileStatus[] archivedWALFiles = fs.listStatus(src);
+ if (archivedWALFiles == null) {
+ return;
+ }
+ for (FileStatus status : archivedWALFiles) {
+ Path file = status.getPath();
+ Path newFile = new Path(dst, file.getName() + suffix);
+ if (fs.rename(file, newFile)) {
+ LOG.info("Moved {} to {}", file, newFile);
+ } else {
+ LOG.warn("Failed to move from {} to {}", file, newFile);
+ }
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionWALRoller.java
similarity index 56%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreWALRoller.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionWALRoller.java
index d24924f14a3..2880a388280 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreWALRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionWALRoller.java
@@ -15,18 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.procedure2.store.region;
+package org.apache.hadoop.hbase.master.store;
import static org.apache.hadoop.hbase.HConstants.HREGION_OLDLOGDIR_NAME;
import java.io.IOException;
-import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@@ -38,23 +35,19 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * As long as there is no RegionServerServices for the procedure store region, we need implement log
- * roller logic by our own.
+ * As long as there is no RegionServerServices for a local region, we need implement log roller
+ * logic by our own.
*
* We can reuse most of the code for normal wal roller, the only difference is that there is only
* one region, so in {@link #scheduleFlush(String)} method we can just schedule flush for the
* procedure store region.
*/
@InterfaceAudience.Private
-final class RegionProcedureStoreWALRoller extends AbstractWALRoller {
+public final class LocalRegionWALRoller extends AbstractWALRoller {
- private static final Logger LOG = LoggerFactory.getLogger(RegionProcedureStoreWALRoller.class);
+ private static final Logger LOG = LoggerFactory.getLogger(LocalRegionWALRoller.class);
- static final String ROLL_PERIOD_MS_KEY = "hbase.procedure.store.region.walroll.period.ms";
-
- private static final long DEFAULT_ROLL_PERIOD_MS = TimeUnit.MINUTES.toMillis(15);
-
- private volatile RegionFlusherAndCompactor flusherAndCompactor;
+ private volatile LocalRegionFlusherAndCompactor flusherAndCompactor;
private final FileSystem fs;
@@ -62,36 +55,22 @@ final class RegionProcedureStoreWALRoller extends AbstractWALRoller {
private final Path globalWALArchiveDir;
- private RegionProcedureStoreWALRoller(Configuration conf, Abortable abortable, FileSystem fs,
- Path walRootDir, Path globalWALRootDir) {
- super("RegionProcedureStoreWALRoller", conf, abortable);
+ private final String archivedWALSuffix;
+
+ private LocalRegionWALRoller(String name, Configuration conf, Abortable abortable, FileSystem fs,
+ Path walRootDir, Path globalWALRootDir, String archivedWALSuffix) {
+ super(name, conf, abortable);
this.fs = fs;
this.walArchiveDir = new Path(walRootDir, HREGION_OLDLOGDIR_NAME);
this.globalWALArchiveDir = new Path(globalWALRootDir, HREGION_OLDLOGDIR_NAME);
+ this.archivedWALSuffix = archivedWALSuffix;
}
@Override
protected void afterRoll(WAL wal) {
// move the archived WAL files to the global archive path
try {
- if (!fs.exists(globalWALArchiveDir) && !fs.mkdirs(globalWALArchiveDir)) {
- LOG.warn("Failed to create global archive dir {}", globalWALArchiveDir);
- return;
- }
- FileStatus[] archivedWALFiles = fs.listStatus(walArchiveDir);
- if (archivedWALFiles == null) {
- return;
- }
- for (FileStatus status : archivedWALFiles) {
- Path file = status.getPath();
- Path newFile = new Path(globalWALArchiveDir,
- file.getName() + MasterProcedureUtil.ARCHIVED_PROC_WAL_SUFFIX);
- if (fs.rename(file, newFile)) {
- LOG.info("Moved {} to {}", file, newFile);
- } else {
- LOG.warn("Failed to move archived wal from {} to global place {}", file, newFile);
- }
- }
+ LocalRegionUtils.moveFilesUnderDir(fs, walArchiveDir, globalWALArchiveDir, archivedWALSuffix);
} catch (IOException e) {
LOG.warn("Failed to move archived wals from {} to global dir {}", walArchiveDir,
globalWALArchiveDir, e);
@@ -100,28 +79,29 @@ final class RegionProcedureStoreWALRoller extends AbstractWALRoller {
@Override
protected void scheduleFlush(String encodedRegionName) {
- RegionFlusherAndCompactor flusher = this.flusherAndCompactor;
+ LocalRegionFlusherAndCompactor flusher = this.flusherAndCompactor;
if (flusher != null) {
flusher.requestFlush();
}
}
- void setFlusherAndCompactor(RegionFlusherAndCompactor flusherAndCompactor) {
+ void setFlusherAndCompactor(LocalRegionFlusherAndCompactor flusherAndCompactor) {
this.flusherAndCompactor = flusherAndCompactor;
}
- static RegionProcedureStoreWALRoller create(Configuration conf, Abortable abortable,
- FileSystem fs, Path walRootDir, Path globalWALRootDir) {
+ static LocalRegionWALRoller create(String name, Configuration conf, Abortable abortable,
+ FileSystem fs, Path walRootDir, Path globalWALRootDir, String archivedWALSuffix,
+ long rollPeriodMs, long flushSize) {
// we can not run with wal disabled, so force set it to true.
conf.setBoolean(WALFactory.WAL_ENABLED, true);
// we do not need this feature, so force disable it.
conf.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, false);
- conf.setLong(WAL_ROLL_PERIOD_KEY, conf.getLong(ROLL_PERIOD_MS_KEY, DEFAULT_ROLL_PERIOD_MS));
- long flushSize = conf.getLong(RegionFlusherAndCompactor.FLUSH_SIZE_KEY,
- RegionFlusherAndCompactor.DEFAULT_FLUSH_SIZE);
+ conf.setLong(WAL_ROLL_PERIOD_KEY, rollPeriodMs);
// make the roll size the same with the flush size, as we only have one region here
conf.setLong(WALUtil.WAL_BLOCK_SIZE, flushSize * 2);
conf.setFloat(AbstractFSWAL.WAL_ROLL_MULTIPLIER, 0.5f);
- return new RegionProcedureStoreWALRoller(conf, abortable, fs, walRootDir, globalWALRootDir);
+ return new LocalRegionWALRoller(name, conf, abortable, fs, walRootDir, globalWALRootDir,
+ archivedWALSuffix);
}
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalStore.java
new file mode 100644
index 00000000000..d2c68842630
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalStore.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.store;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Used for storing data at master side. The data will be stored in a {@link LocalRegion}.
+ */
+@InterfaceAudience.Private
+public final class LocalStore {
+
+ // Use the character $ to let the log cleaner know that this is not the normal wal file.
+ public static final String ARCHIVED_WAL_SUFFIX = "$masterlocalwal$";
+
+ // this is a bit trick that in StoreFileInfo.validateStoreFileName, we just test if the file name
+ // contains '-' to determine if it is a valid store file, so here we have to add '-'in the file
+ // name to avoid being processed by normal TimeToLiveHFileCleaner.
+ public static final String ARCHIVED_HFILE_SUFFIX = "$-masterlocalhfile-$";
+
+ private static final String MAX_WALS_KEY = "hbase.master.store.region.maxwals";
+
+ private static final int DEFAULT_MAX_WALS = 10;
+
+ public static final String USE_HSYNC_KEY = "hbase.master.store.region.wal.hsync";
+
+ public static final String MASTER_STORE_DIR = "MasterData";
+
+ private static final String FLUSH_SIZE_KEY = "hbase.master.store.region.flush.size";
+
+ private static final long DEFAULT_FLUSH_SIZE = TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE;
+
+ private static final String FLUSH_PER_CHANGES_KEY = "hbase.master.store.region.flush.per.changes";
+
+ private static final long DEFAULT_FLUSH_PER_CHANGES = 1_000_000;
+
+ private static final String FLUSH_INTERVAL_MS_KEY = "hbase.master.store.region.flush.interval.ms";
+
+ // default to flush every 15 minutes, for safety
+ private static final long DEFAULT_FLUSH_INTERVAL_MS = TimeUnit.MINUTES.toMillis(15);
+
+ private static final String COMPACT_MIN_KEY = "hbase.master.store.region.compact.min";
+
+ private static final int DEFAULT_COMPACT_MIN = 4;
+
+ private static final String ROLL_PERIOD_MS_KEY = "hbase.master.store.region.walroll.period.ms";
+
+ private static final long DEFAULT_ROLL_PERIOD_MS = TimeUnit.MINUTES.toMillis(15);
+
+ private static final String RING_BUFFER_SLOT_COUNT = "hbase.master.store.ringbuffer.slot.count";
+
+ private static final int DEFAULT_RING_BUFFER_SLOT_COUNT = 128;
+
+ public static final TableName TABLE_NAME = TableName.valueOf("master:store");
+
+ public static final byte[] PROC_FAMILY = Bytes.toBytes("proc");
+
+ private static final TableDescriptor TABLE_DESC = TableDescriptorBuilder.newBuilder(TABLE_NAME)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(PROC_FAMILY)).build();
+
+ private final LocalRegion region;
+
+ private LocalStore(LocalRegion region) {
+ this.region = region;
+ }
+
+ public void update(UpdateLocalRegion action) throws IOException {
+ region.update(action);
+ }
+
+ public Result get(Get get) throws IOException {
+ return region.get(get);
+ }
+
+ public RegionScanner getScanner(Scan scan) throws IOException {
+ return region.getScanner(scan);
+ }
+
+ public void close(boolean abort) {
+ region.close(abort);
+ }
+
+ @VisibleForTesting
+ public FlushResult flush(boolean force) throws IOException {
+ return region.flush(force);
+ }
+
+ @VisibleForTesting
+ public void requestRollAll() {
+ region.requestRollAll();
+ }
+
+ @VisibleForTesting
+ public void waitUntilWalRollFinished() throws InterruptedException {
+ region.waitUntilWalRollFinished();
+ }
+
+ public static LocalStore create(Server server) throws IOException {
+ LocalRegionParams params = new LocalRegionParams().server(server)
+ .regionDirName(MASTER_STORE_DIR).tableDescriptor(TABLE_DESC);
+ Configuration conf = server.getConfiguration();
+ long flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
+ long flushPerChanges = conf.getLong(FLUSH_PER_CHANGES_KEY, DEFAULT_FLUSH_PER_CHANGES);
+ long flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
+ int compactMin = conf.getInt(COMPACT_MIN_KEY, DEFAULT_COMPACT_MIN);
+ params.flushSize(flushSize).flushPerChanges(flushPerChanges).flushIntervalMs(flushIntervalMs)
+ .compactMin(compactMin);
+ int maxWals = conf.getInt(MAX_WALS_KEY, DEFAULT_MAX_WALS);
+ params.maxWals(maxWals);
+ if (conf.get(USE_HSYNC_KEY) != null) {
+ params.useHsync(conf.getBoolean(USE_HSYNC_KEY, false));
+ }
+ params.ringBufferSlotCount(conf.getInt(RING_BUFFER_SLOT_COUNT, DEFAULT_RING_BUFFER_SLOT_COUNT));
+ long rollPeriodMs = conf.getLong(ROLL_PERIOD_MS_KEY, DEFAULT_ROLL_PERIOD_MS);
+ params.rollPeriodMs(rollPeriodMs).archivedWalSuffix(ARCHIVED_WAL_SUFFIX)
+ .archivedHFileSuffix(ARCHIVED_HFILE_SUFFIX);
+ LocalRegion region = LocalRegion.create(params);
+ return new LocalStore(region);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/UpdateLocalRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/UpdateLocalRegion.java
new file mode 100644
index 00000000000..bfd279c053a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/UpdateLocalRegion.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.store;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+@FunctionalInterface
+public interface UpdateLocalRegion {
+
+ void update(HRegion region) throws IOException;
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/HFileProcedurePrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/HFileProcedurePrettyPrinter.java
index 110547fcdd4..4aac5da17d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/HFileProcedurePrettyPrinter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/HFileProcedurePrettyPrinter.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
@@ -76,16 +77,15 @@ public class HFileProcedurePrettyPrinter extends AbstractHBaseTool {
addOptWithArg("w", "seekToPid", "Seek to this procedure id and print this procedure only");
OptionGroup files = new OptionGroup();
files.addOption(new Option("f", "file", true,
- "File to scan. Pass full-path; e.g. hdfs://a:9000/MasterProcs/master/procedure/p/xxx"));
+ "File to scan. Pass full-path; e.g. hdfs://a:9000/MasterProcs/master/local/proc/xxx"));
files.addOption(new Option("a", "all", false, "Scan the whole procedure region."));
files.setRequired(true);
options.addOptionGroup(files);
}
private void addAllHFiles() throws IOException {
- Path masterProcDir =
- new Path(CommonFSUtils.getWALRootDir(conf), RegionProcedureStore.MASTER_PROCEDURE_DIR);
- Path tableDir = CommonFSUtils.getTableDir(masterProcDir, RegionProcedureStore.TABLE_NAME);
+ Path masterProcDir = new Path(CommonFSUtils.getRootDir(conf), LocalStore.MASTER_STORE_DIR);
+ Path tableDir = CommonFSUtils.getTableDir(masterProcDir, LocalStore.TABLE_NAME);
FileSystem fs = tableDir.getFileSystem(conf);
Path regionDir =
fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
index 3cdd8178238..45f411b05d9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
@@ -18,8 +18,8 @@
package org.apache.hadoop.hbase.procedure2.store.region;
import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
-import static org.apache.hadoop.hbase.HConstants.HREGION_LOGDIR_NAME;
import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
+import static org.apache.hadoop.hbase.master.store.LocalStore.PROC_FAMILY;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -30,86 +30,48 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.log.HBaseMarkers;
-import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure;
import org.apache.hadoop.hbase.master.assignment.UnassignProcedure;
-import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
-import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
+import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
import org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.util.HFileArchiveUtil;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
-import org.apache.hbase.thirdparty.com.google.common.math.IntMath;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
/**
- * A procedure store which uses a region to store all the procedures.
+ * A procedure store which uses the master local store to store all the procedures.
*
- * FileSystem layout:
- *
- *
- * hbase
- * |
- * --MasterProcs
- * |
- * --data
- * | |
- * | --/master/procedure/<encoded-region-name> <---- The region data
- * | |
- * | --replay <---- The edits to replay
- * |
- * --WALs
- * |
- * --<master-server-name> <---- The WAL dir for active master
- * |
- * --<master-server-name>-dead <---- The WAL dir dead master
- *
- *
- * We use p:d column to store the serialized protobuf format procedure, and when deleting we will
+ * We use proc:d column to store the serialized protobuf format procedure, and when deleting we will
* first fill the info:proc column with an empty byte array, and then actually delete them in the
* {@link #cleanup()} method. This is because that we need to retain the max procedure id, so we can
* not directly delete a procedure row as we do not know if it is the one with the max procedure id.
@@ -119,58 +81,20 @@ public class RegionProcedureStore extends ProcedureStoreBase {
private static final Logger LOG = LoggerFactory.getLogger(RegionProcedureStore.class);
- static final String MAX_WALS_KEY = "hbase.procedure.store.region.maxwals";
-
- private static final int DEFAULT_MAX_WALS = 10;
-
- static final String USE_HSYNC_KEY = "hbase.procedure.store.region.wal.hsync";
-
- static final String MASTER_PROCEDURE_DIR = "MasterProcs";
-
- static final String HFILECLEANER_PLUGINS = "hbase.procedure.store.region.hfilecleaner.plugins";
-
- private static final String REPLAY_EDITS_DIR = "recovered.wals";
-
- private static final String DEAD_WAL_DIR_SUFFIX = "-dead";
-
- static final TableName TABLE_NAME = TableName.valueOf("master:procedure");
-
- static final byte[] FAMILY = Bytes.toBytes("p");
-
static final byte[] PROC_QUALIFIER = Bytes.toBytes("d");
- private static final int REGION_ID = 1;
-
- private static final TableDescriptor TABLE_DESC = TableDescriptorBuilder.newBuilder(TABLE_NAME)
- .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
-
private final Server server;
- private final DirScanPool cleanerPool;
-
private final LeaseRecovery leaseRecovery;
- // Used to delete the compacted hfiles. Since we put all data on WAL filesystem, it is not
- // possible to move the compacted hfiles to the global hfile archive directory, we have to do it
- // by ourselves.
- private HFileCleaner cleaner;
-
- private WALFactory walFactory;
-
@VisibleForTesting
- HRegion region;
-
- @VisibleForTesting
- RegionFlusherAndCompactor flusherAndCompactor;
-
- @VisibleForTesting
- RegionProcedureStoreWALRoller walRoller;
+ final LocalStore localStore;
private int numThreads;
- public RegionProcedureStore(Server server, DirScanPool cleanerPool, LeaseRecovery leaseRecovery) {
+ public RegionProcedureStore(Server server, LocalStore localStore, LeaseRecovery leaseRecovery) {
this.server = server;
- this.cleanerPool = cleanerPool;
+ this.localStore = localStore;
this.leaseRecovery = leaseRecovery;
}
@@ -183,52 +107,12 @@ public class RegionProcedureStore extends ProcedureStoreBase {
this.numThreads = numThreads;
}
- private void shutdownWAL() {
- if (walFactory != null) {
- try {
- walFactory.shutdown();
- } catch (IOException e) {
- LOG.warn("Failed to shutdown WAL", e);
- }
- }
- }
-
- private void closeRegion(boolean abort) {
- if (region != null) {
- try {
- region.close(abort);
- } catch (IOException e) {
- LOG.warn("Failed to close region", e);
- }
- }
-
- }
-
@Override
public void stop(boolean abort) {
if (!setRunning(false)) {
return;
}
LOG.info("Stopping the Region Procedure Store, isAbort={}", abort);
- if (cleaner != null) {
- cleaner.cancel(abort);
- }
- if (flusherAndCompactor != null) {
- flusherAndCompactor.close();
- }
- // if abort, we shutdown wal first to fail the ongoing updates to the region, and then close the
- // region, otherwise there will be dead lock.
- if (abort) {
- shutdownWAL();
- closeRegion(true);
- } else {
- closeRegion(false);
- shutdownWAL();
- }
-
- if (walRoller != null) {
- walRoller.close();
- }
}
@Override
@@ -242,91 +126,6 @@ public class RegionProcedureStore extends ProcedureStoreBase {
return count;
}
- private WAL createWAL(FileSystem fs, Path rootDir, RegionInfo regionInfo) throws IOException {
- String logName = AbstractFSWALProvider.getWALDirectoryName(server.getServerName().toString());
- Path walDir = new Path(rootDir, logName);
- LOG.debug("WALDir={}", walDir);
- if (fs.exists(walDir)) {
- throw new HBaseIOException(
- "Master procedure store has already created directory at " + walDir);
- }
- if (!fs.mkdirs(walDir)) {
- throw new IOException("Can not create master procedure wal directory " + walDir);
- }
- WAL wal = walFactory.getWAL(regionInfo);
- walRoller.addWAL(wal);
- return wal;
- }
-
- private HRegion bootstrap(Configuration conf, FileSystem fs, Path rootDir) throws IOException {
- RegionInfo regionInfo = RegionInfoBuilder.newBuilder(TABLE_NAME).setRegionId(REGION_ID).build();
- Path tmpTableDir = CommonFSUtils.getTableDir(rootDir, TableName
- .valueOf(TABLE_NAME.getNamespaceAsString(), TABLE_NAME.getQualifierAsString() + "-tmp"));
- if (fs.exists(tmpTableDir) && !fs.delete(tmpTableDir, true)) {
- throw new IOException("Can not delete partial created proc region " + tmpTableDir);
- }
- HRegion.createHRegion(conf, regionInfo, fs, tmpTableDir, TABLE_DESC).close();
- Path tableDir = CommonFSUtils.getTableDir(rootDir, TABLE_NAME);
- if (!fs.rename(tmpTableDir, tableDir)) {
- throw new IOException("Can not rename " + tmpTableDir + " to " + tableDir);
- }
- WAL wal = createWAL(fs, rootDir, regionInfo);
- return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, TABLE_DESC, wal, null,
- null);
- }
-
- private HRegion open(Configuration conf, FileSystem fs, Path rootDir) throws IOException {
- String factoryId = server.getServerName().toString();
- Path tableDir = CommonFSUtils.getTableDir(rootDir, TABLE_NAME);
- Path regionDir =
- fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
- .getPath();
- Path replayEditsDir = new Path(regionDir, REPLAY_EDITS_DIR);
- if (!fs.exists(replayEditsDir) && !fs.mkdirs(replayEditsDir)) {
- throw new IOException("Failed to create replay directory: " + replayEditsDir);
- }
- Path walsDir = new Path(rootDir, HREGION_LOGDIR_NAME);
- for (FileStatus walDir : fs.listStatus(walsDir)) {
- if (!walDir.isDirectory()) {
- continue;
- }
- if (walDir.getPath().getName().startsWith(factoryId)) {
- LOG.warn("This should not happen in real production as we have not created our WAL " +
- "directory yet, ignore if you are running a procedure related UT");
- }
- Path deadWALDir;
- if (!walDir.getPath().getName().endsWith(DEAD_WAL_DIR_SUFFIX)) {
- deadWALDir =
- new Path(walDir.getPath().getParent(), walDir.getPath().getName() + DEAD_WAL_DIR_SUFFIX);
- if (!fs.rename(walDir.getPath(), deadWALDir)) {
- throw new IOException("Can not rename " + walDir + " to " + deadWALDir +
- " when recovering lease of proc store");
- }
- LOG.info("Renamed {} to {} as it is dead", walDir.getPath(), deadWALDir);
- } else {
- deadWALDir = walDir.getPath();
- LOG.info("{} is already marked as dead", deadWALDir);
- }
- for (FileStatus walFile : fs.listStatus(deadWALDir)) {
- Path replayEditsFile = new Path(replayEditsDir, walFile.getPath().getName());
- leaseRecovery.recoverFileLease(fs, walFile.getPath());
- if (!fs.rename(walFile.getPath(), replayEditsFile)) {
- throw new IOException("Can not rename " + walFile.getPath() + " to " + replayEditsFile +
- " when recovering lease of proc store");
- }
- LOG.info("Renamed {} to {}", walFile.getPath(), replayEditsFile);
- }
- LOG.info("Delete empty proc wal dir {}", deadWALDir);
- fs.delete(deadWALDir, true);
- }
- RegionInfo regionInfo = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
- WAL wal = createWAL(fs, rootDir, regionInfo);
- conf.set(HRegion.SPECIAL_RECOVERED_EDITS_DIR,
- replayEditsDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
- return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, TABLE_DESC, wal, null,
- null);
- }
-
@SuppressWarnings("deprecation")
private static final ImmutableSet> UNSUPPORTED_PROCEDURES =
ImmutableSet.of(RecoverMetaProcedure.class, AssignProcedure.class, UnassignProcedure.class,
@@ -437,8 +236,8 @@ public class RegionProcedureStore extends ProcedureStoreBase {
if (maxProcIdSet.longValue() > maxProcIdFromProcs.longValue()) {
if (maxProcIdSet.longValue() > 0) {
// let's add a fake row to retain the max proc id
- region.put(new Put(Bytes.toBytes(maxProcIdSet.longValue())).addColumn(FAMILY,
- PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
+ localStore.update(r -> r.put(new Put(Bytes.toBytes(maxProcIdSet.longValue()))
+ .addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY)));
}
} else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures");
@@ -453,46 +252,8 @@ public class RegionProcedureStore extends ProcedureStoreBase {
@Override
public void recoverLease() throws IOException {
- LOG.debug("Starting Region Procedure Store lease recovery...");
- Configuration baseConf = server.getConfiguration();
- FileSystem fs = CommonFSUtils.getWALFileSystem(baseConf);
- Path globalWALRootDir = CommonFSUtils.getWALRootDir(baseConf);
- Path rootDir = new Path(globalWALRootDir, MASTER_PROCEDURE_DIR);
- // we will override some configurations so create a new one.
- Configuration conf = new Configuration(baseConf);
- CommonFSUtils.setRootDir(conf, rootDir);
- CommonFSUtils.setWALRootDir(conf, rootDir);
- RegionFlusherAndCompactor.setupConf(conf);
- conf.setInt(AbstractFSWAL.MAX_LOGS, conf.getInt(MAX_WALS_KEY, DEFAULT_MAX_WALS));
- if (conf.get(USE_HSYNC_KEY) != null) {
- conf.set(HRegion.WAL_HSYNC_CONF_KEY, conf.get(USE_HSYNC_KEY));
- }
- conf.setInt(AbstractFSWAL.RING_BUFFER_SLOT_COUNT, IntMath.ceilingPowerOfTwo(16 * numThreads));
-
- walRoller = RegionProcedureStoreWALRoller.create(conf, server, fs, rootDir, globalWALRootDir);
- walRoller.start();
-
- walFactory = new WALFactory(conf, server.getServerName().toString());
- Path tableDir = CommonFSUtils.getTableDir(rootDir, TABLE_NAME);
- if (fs.exists(tableDir)) {
- // load the existing region.
- region = open(conf, fs, rootDir);
- } else {
- // bootstrapping...
- region = bootstrap(conf, fs, rootDir);
- }
- flusherAndCompactor = new RegionFlusherAndCompactor(conf, server, region);
- walRoller.setFlusherAndCompactor(flusherAndCompactor);
- int cleanerInterval = conf.getInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL,
- HMaster.DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
- Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
- if (!fs.mkdirs(archiveDir)) {
- LOG.warn("Failed to create archive directory {}. Usually this should not happen but it will" +
- " be created again when we actually archive the hfiles later, so continue", archiveDir);
- }
- cleaner = new HFileCleaner("RegionProcedureStoreHFileCleaner", cleanerInterval, server, conf,
- fs, archiveDir, HFILECLEANER_PLUGINS, cleanerPool, Collections.emptyMap());
- server.getChoreService().scheduleChore(cleaner);
+ LOG.info("Starting Region Procedure Store lease recovery...");
+ FileSystem fs = CommonFSUtils.getWALFileSystem(server.getConfiguration());
tryMigrate(fs);
}
@@ -501,7 +262,8 @@ public class RegionProcedureStore extends ProcedureStoreBase {
List procs = new ArrayList<>();
long maxProcId = 0;
- try (RegionScanner scanner = region.getScanner(new Scan().addColumn(FAMILY, PROC_QUALIFIER))) {
+ try (RegionScanner scanner =
+ localStore.getScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER))) {
List cells = new ArrayList<>();
boolean moreRows;
do {
@@ -530,7 +292,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
throws IOException {
ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
byte[] row = Bytes.toBytes(proc.getProcId());
- mutations.add(new Put(row).addColumn(FAMILY, PROC_QUALIFIER, proto.toByteArray()));
+ mutations.add(new Put(row).addColumn(PROC_FAMILY, PROC_QUALIFIER, proto.toByteArray()));
rowsToLock.add(row);
}
@@ -538,7 +300,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
// the proc column with an empty array.
private void serializeDelete(long procId, List mutations, List rowsToLock) {
byte[] row = Bytes.toBytes(procId);
- mutations.add(new Put(row).addColumn(FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
+ mutations.add(new Put(row).addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
rowsToLock.add(row);
}
@@ -571,14 +333,13 @@ public class RegionProcedureStore extends ProcedureStoreBase {
for (Procedure> subProc : subProcs) {
serializePut(subProc, mutations, rowsToLock);
}
- region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+ localStore.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc,
Arrays.toString(subProcs), e);
throw new UncheckedIOException(e);
}
});
- flusherAndCompactor.onUpdate();
}
@Override
@@ -590,13 +351,12 @@ public class RegionProcedureStore extends ProcedureStoreBase {
for (Procedure> proc : procs) {
serializePut(proc, mutations, rowsToLock);
}
- region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+ localStore.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e);
throw new UncheckedIOException(e);
}
});
- flusherAndCompactor.onUpdate();
}
@Override
@@ -604,26 +364,24 @@ public class RegionProcedureStore extends ProcedureStoreBase {
runWithoutRpcCall(() -> {
try {
ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
- region.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(FAMILY, PROC_QUALIFIER,
- proto.toByteArray()));
+ localStore.update(r -> r.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(PROC_FAMILY,
+ PROC_QUALIFIER, proto.toByteArray())));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e);
throw new UncheckedIOException(e);
}
});
- flusherAndCompactor.onUpdate();
}
@Override
public void delete(long procId) {
try {
- region
- .put(new Put(Bytes.toBytes(procId)).addColumn(FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
+ localStore.update(r -> r.put(
+ new Put(Bytes.toBytes(procId)).addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY)));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to delete pid={}", procId, e);
throw new UncheckedIOException(e);
}
- flusherAndCompactor.onUpdate();
}
@Override
@@ -635,13 +393,12 @@ public class RegionProcedureStore extends ProcedureStoreBase {
for (long subProcId : subProcIds) {
serializeDelete(subProcId, mutations, rowsToLock);
}
- region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+ localStore.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to delete parent proc {}, sub pids={}", parentProc,
Arrays.toString(subProcIds), e);
throw new UncheckedIOException(e);
}
- flusherAndCompactor.onUpdate();
}
@Override
@@ -660,12 +417,11 @@ public class RegionProcedureStore extends ProcedureStoreBase {
serializeDelete(procId, mutations, rowsToLock);
}
try {
- region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+ localStore.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to delete pids={}", Arrays.toString(procIds), e);
throw new UncheckedIOException(e);
}
- flusherAndCompactor.onUpdate();
}
@Override
@@ -673,7 +429,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
// actually delete the procedures if it is not the one with the max procedure id.
List cells = new ArrayList();
try (RegionScanner scanner =
- region.getScanner(new Scan().addColumn(FAMILY, PROC_QUALIFIER).setReversed(true))) {
+ localStore.getScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER).setReversed(true))) {
// skip the row with max procedure id
boolean moreRows = scanner.next(cells);
if (cells.isEmpty()) {
@@ -688,7 +444,8 @@ public class RegionProcedureStore extends ProcedureStoreBase {
Cell cell = cells.get(0);
cells.clear();
if (cell.getValueLength() == 0) {
- region.delete(new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+ localStore.update(r -> r
+ .delete(new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())));
}
}
} catch (IOException e) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/WALProcedurePrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/WALProcedurePrettyPrinter.java
index 1e5c1422d0d..b36e1b5375e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/WALProcedurePrettyPrinter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/WALProcedurePrettyPrinter.java
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hbase.procedure2.store.region;
-import static org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore.FAMILY;
+import static org.apache.hadoop.hbase.master.store.LocalStore.PROC_FAMILY;
import java.io.PrintStream;
import java.time.Instant;
@@ -102,8 +102,8 @@ public class WALProcedurePrettyPrinter extends AbstractHBaseTool {
String.format(KEY_TMPL, sequenceId, FORMATTER.format(Instant.ofEpochMilli(writeTime))));
for (Cell cell : edit.getCells()) {
Map op = WALPrettyPrinter.toStringMap(cell);
- if (!Bytes.equals(FAMILY, 0, FAMILY.length, cell.getFamilyArray(), cell.getFamilyOffset(),
- cell.getFamilyLength())) {
+ if (!Bytes.equals(PROC_FAMILY, 0, PROC_FAMILY.length, cell.getFamilyArray(),
+ cell.getFamilyOffset(), cell.getFamilyLength())) {
// We could have cells other than procedure edits, for example, a flush marker
WALPrettyPrinter.printCell(out, op, false);
continue;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java
index 684d90dd545..450dd6f6e37 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java
@@ -42,12 +42,11 @@ public final class HFileArchiveUtil {
* @param tableName table name under which the store currently lives
* @param regionName region encoded name under which the store currently lives
* @param familyName name of the family in the store
- * @return {@link Path} to the directory to archive the given store or
- * null if it should not be archived
+ * @return {@link Path} to the directory to archive the given store or null if it should
+ * not be archived
*/
- public static Path getStoreArchivePath(final Configuration conf,
- final TableName tableName,
- final String regionName, final String familyName) throws IOException {
+ public static Path getStoreArchivePath(final Configuration conf, final TableName tableName,
+ final String regionName, final String familyName) throws IOException {
Path tableArchiveDir = getTableArchivePath(conf, tableName);
return HStore.getStoreHomedir(tableArchiveDir, regionName, Bytes.toBytes(familyName));
}
@@ -61,10 +60,8 @@ public final class HFileArchiveUtil {
* @return {@link Path} to the directory to archive the given store or null if it should
* not be archived
*/
- public static Path getStoreArchivePath(Configuration conf,
- RegionInfo region,
- Path tabledir,
- byte[] family) throws IOException {
+ public static Path getStoreArchivePath(Configuration conf, RegionInfo region, Path tabledir,
+ byte[] family) throws IOException {
return getStoreArchivePath(conf, region, family);
}
@@ -84,22 +81,27 @@ public final class HFileArchiveUtil {
}
/**
- * Gets the archive directory under specified root dir. One scenario where this is useful is
- * when WAL and root dir are configured under different file systems,
- * i.e. root dir on S3 and WALs on HDFS.
- * This is mostly useful for archiving recovered edits, when
+ * Gets the archive directory under specified root dir. One scenario where this is useful is when
+ * WAL and root dir are configured under different file systems, i.e. root dir on S3 and WALs on
+ * HDFS. This is mostly useful for archiving recovered edits, when
* hbase.region.archive.recovered.edits is enabled.
* @param rootDir {@link Path} the root dir under which archive path should be created.
* @param region parent region information under which the store currently lives
* @param family name of the family in the store
- * @return {@link Path} to the WAL FS directory to archive the given store
- * or null if it should not be archived
+ * @return {@link Path} to the WAL FS directory to archive the given store or null if it
+ * should not be archived
*/
public static Path getStoreArchivePathForRootDir(Path rootDir, RegionInfo region, byte[] family) {
Path tableArchiveDir = getTableArchivePath(rootDir, region.getTable());
return HStore.getStoreHomedir(tableArchiveDir, region, family);
}
+ public static Path getStoreArchivePathForArchivePath(Path archivePath, RegionInfo region,
+ byte[] family) {
+ Path tableArchiveDir = CommonFSUtils.getTableDir(archivePath, region.getTable());
+ return HStore.getStoreHomedir(tableArchiveDir, region, family);
+ }
+
/**
* Get the archive directory for a given region under the specified table
* @param tableName the table name. Cannot be null.
@@ -107,9 +109,7 @@ public final class HFileArchiveUtil {
* @return {@link Path} to the directory to archive the given region, or null if it
* should not be archived
*/
- public static Path getRegionArchiveDir(Path rootDir,
- TableName tableName,
- Path regiondir) {
+ public static Path getRegionArchiveDir(Path rootDir, TableName tableName, Path regiondir) {
// get the archive directory for a table
Path archiveDir = getTableArchivePath(rootDir, tableName);
@@ -126,8 +126,8 @@ public final class HFileArchiveUtil {
* @return {@link Path} to the directory to archive the given region, or null if it
* should not be archived
*/
- public static Path getRegionArchiveDir(Path rootDir,
- TableName tableName, String encodedRegionName) {
+ public static Path getRegionArchiveDir(Path rootDir, TableName tableName,
+ String encodedRegionName) {
// get the archive directory for a table
Path archiveDir = getTableArchivePath(rootDir, tableName);
return HRegion.getRegionDir(archiveDir, encodedRegionName);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/LocalRegionTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/LocalRegionTestBase.java
new file mode 100644
index 00000000000..70c69a8ec6e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/LocalRegionTestBase.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.store;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.junit.After;
+import org.junit.Before;
+
+public class LocalRegionTestBase {
+
+ protected HBaseCommonTestingUtility htu;
+
+ protected LocalRegion region;
+
+ protected ChoreService choreService;
+
+ protected DirScanPool cleanerPool;
+
+ protected static byte[] CF1 = Bytes.toBytes("f1");
+
+ protected static byte[] CF2 = Bytes.toBytes("f2");
+
+ protected static byte[] QUALIFIER = Bytes.toBytes("q");
+
+ protected static String REGION_DIR_NAME = "local";
+
+ protected static TableDescriptor TD =
+ TableDescriptorBuilder.newBuilder(TableName.valueOf("test:local"))
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1))
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF2)).build();
+
+ protected void configure(Configuration conf) throws IOException {
+ }
+
+ protected void configure(LocalRegionParams params) {
+ }
+
+ protected void postSetUp() throws IOException {
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ htu = new HBaseCommonTestingUtility();
+ htu.getConfiguration().setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
+ // Runs on local filesystem. Test does not need sync. Turn off checks.
+ htu.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
+ configure(htu.getConfiguration());
+ choreService = new ChoreService(getClass().getSimpleName());
+ cleanerPool = new DirScanPool(htu.getConfiguration());
+ Server server = mock(Server.class);
+ when(server.getConfiguration()).thenReturn(htu.getConfiguration());
+ when(server.getServerName())
+ .thenReturn(ServerName.valueOf("localhost", 12345, System.currentTimeMillis()));
+ when(server.getChoreService()).thenReturn(choreService);
+ Path testDir = htu.getDataTestDir();
+ CommonFSUtils.setRootDir(htu.getConfiguration(), testDir);
+ LocalRegionParams params = new LocalRegionParams();
+ params.server(server).regionDirName(REGION_DIR_NAME).tableDescriptor(TD)
+ .flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000)
+ .flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(4).maxWals(32).useHsync(false)
+ .ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15))
+ .archivedWalSuffix(LocalStore.ARCHIVED_WAL_SUFFIX)
+ .archivedHFileSuffix(LocalStore.ARCHIVED_HFILE_SUFFIX);
+ configure(params);
+ region = LocalRegion.create(params);
+ postSetUp();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ region.close(true);
+ cleanerPool.shutdownNow();
+ choreService.shutdown();
+ htu.cleanupTestDir();
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionCompaction.java
new file mode 100644
index 00000000000..d1b405819e0
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionCompaction.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.store;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
+import org.apache.hadoop.hbase.master.cleaner.TimeToLiveMasterLocalStoreHFileCleaner;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestLocalRegionCompaction extends LocalRegionTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestLocalRegionCompaction.class);
+
+ private int compactMin = 4;
+
+ private HFileCleaner hfileCleaner;
+
+ @Override
+ protected void configure(LocalRegionParams params) {
+ params.compactMin(compactMin);
+ }
+
+ @Override
+ protected void postSetUp() throws IOException {
+ Configuration conf = htu.getConfiguration();
+ conf.setLong(TimeToLiveMasterLocalStoreHFileCleaner.TTL_CONF_KEY, 5000);
+ Path testDir = htu.getDataTestDir();
+ FileSystem fs = testDir.getFileSystem(conf);
+ Path globalArchivePath = HFileArchiveUtil.getArchivePath(conf);
+ hfileCleaner = new HFileCleaner(500, new Stoppable() {
+
+ private volatile boolean stopped = false;
+
+ @Override
+ public void stop(String why) {
+ stopped = true;
+ }
+
+ @Override
+ public boolean isStopped() {
+ return stopped;
+ }
+ }, conf, fs, globalArchivePath, cleanerPool);
+ choreService.scheduleChore(hfileCleaner);
+ }
+
+ private int getStorefilesCount() {
+ return region.region.getStores().stream().mapToInt(Store::getStorefilesCount).sum();
+ }
+
+ private void assertFileCount(FileSystem fs, Path storeArchiveDir, int expected)
+ throws IOException {
+ FileStatus[] compactedHFiles = fs.listStatus(storeArchiveDir);
+ assertEquals(expected, compactedHFiles.length);
+ }
+
+ @Test
+ public void test() throws IOException, InterruptedException {
+ for (int i = 0; i < compactMin - 1; i++) {
+ final int index = i;
+ region.update(
+ r -> r.put(new Put(Bytes.toBytes(index)).addColumn(CF1, QUALIFIER, Bytes.toBytes(index))
+ .addColumn(CF2, QUALIFIER, Bytes.toBytes(index))));
+ region.flush(true);
+ }
+ assertEquals(2 * (compactMin - 1), getStorefilesCount());
+ region.update(r -> r.put(new Put(Bytes.toBytes(compactMin - 1)).addColumn(CF1, QUALIFIER,
+ Bytes.toBytes(compactMin - 1))));
+ region.flusherAndCompactor.requestFlush();
+ htu.waitFor(15000, () -> getStorefilesCount() == 2);
+ Path store1ArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(htu.getDataTestDir(),
+ region.region.getRegionInfo(), CF1);
+ Path store2ArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(htu.getDataTestDir(),
+ region.region.getRegionInfo(), CF2);
+ FileSystem fs = store1ArchiveDir.getFileSystem(htu.getConfiguration());
+ // after compaction, the old hfiles should have been compacted
+ htu.waitFor(15000, () -> {
+ try {
+ FileStatus[] fses1 = fs.listStatus(store1ArchiveDir);
+ FileStatus[] fses2 = fs.listStatus(store2ArchiveDir);
+ return fses1 != null && fses1.length == compactMin && fses2 != null &&
+ fses2.length == compactMin - 1;
+ } catch (FileNotFoundException e) {
+ return false;
+ }
+ });
+ // ttl has not expired, so should not delete any files
+ Thread.sleep(1000);
+ FileStatus[] compactedHFiles = fs.listStatus(store1ArchiveDir);
+ assertEquals(compactMin, compactedHFiles.length);
+ assertFileCount(fs, store2ArchiveDir, compactMin - 1);
+ Thread.sleep(2000);
+ // touch one file
+
+ long currentTime = System.currentTimeMillis();
+ fs.setTimes(compactedHFiles[0].getPath(), currentTime, currentTime);
+ Thread.sleep(3000);
+ // only the touched file is still there after clean up
+ FileStatus[] remainingHFiles = fs.listStatus(store1ArchiveDir);
+ assertEquals(1, remainingHFiles.length);
+ assertEquals(compactedHFiles[0].getPath(), remainingHFiles[0].getPath());
+ assertFalse(fs.exists(store2ArchiveDir));
+ Thread.sleep(6000);
+ // the touched file should also be cleaned up and then the cleaner will delete the parent
+ // directory since it is empty.
+ assertFalse(fs.exists(store1ArchiveDir));
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionFlush.java
similarity index 82%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreFlush.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionFlush.java
index 288696ef1d3..48bbba3896d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionFlush.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.procedure2.store.region;
+package org.apache.hadoop.hbase.master.store;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -25,13 +25,17 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -43,17 +47,17 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, MediumTests.class })
-public class TestRegionProcedureStoreFlush {
+public class TestLocalRegionFlush {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestRegionProcedureStoreFlush.class);
+ HBaseClassTestRule.forClass(TestLocalRegionFlush.class);
private Configuration conf;
private HRegion region;
- private RegionFlusherAndCompactor flusher;
+ private LocalRegionFlusherAndCompactor flusher;
private AtomicInteger flushCalled;
@@ -68,6 +72,8 @@ public class TestRegionProcedureStoreFlush {
HStore store = mock(HStore.class);
when(store.getStorefilesCount()).thenReturn(1);
when(region.getStores()).thenReturn(Collections.singletonList(store));
+ when(region.getRegionInfo())
+ .thenReturn(RegionInfoBuilder.newBuilder(TableName.valueOf("hbase:local")).build());
flushCalled = new AtomicInteger(0);
memstoreHeapSize = new AtomicLong(0);
memstoreOffHeapSize = new AtomicLong(0);
@@ -90,8 +96,8 @@ public class TestRegionProcedureStoreFlush {
}
}
- private void initFlusher() {
- flusher = new RegionFlusherAndCompactor(conf, new Abortable() {
+ private void initFlusher(long flushSize, long flushPerChanges, long flushIntervalMs) {
+ flusher = new LocalRegionFlusherAndCompactor(conf, new Abortable() {
@Override
public boolean isAborted() {
@@ -101,13 +107,12 @@ public class TestRegionProcedureStoreFlush {
@Override
public void abort(String why, Throwable e) {
}
- }, region);
+ }, region, flushSize, flushPerChanges, flushIntervalMs, 4, new Path("/tmp"), "");
}
@Test
public void testTriggerFlushBySize() throws IOException, InterruptedException {
- conf.setLong(RegionFlusherAndCompactor.FLUSH_SIZE_KEY, 1024 * 1024);
- initFlusher();
+ initFlusher(1024 * 1024, 1_000_000, TimeUnit.MINUTES.toMillis(15));
memstoreHeapSize.set(1000 * 1024);
flusher.onUpdate();
Thread.sleep(1000);
@@ -130,16 +135,14 @@ public class TestRegionProcedureStoreFlush {
@Test
public void testTriggerFlushByChanges() throws InterruptedException {
- conf.setLong(RegionFlusherAndCompactor.FLUSH_PER_CHANGES_KEY, 10);
- initFlusher();
+ initFlusher(128 * 1024 * 1024, 10, TimeUnit.MINUTES.toMillis(15));
assertTriggerFlushByChanges(10);
assertTriggerFlushByChanges(10);
}
@Test
public void testPeriodicalFlush() throws InterruptedException {
- conf.setLong(RegionFlusherAndCompactor.FLUSH_INTERVAL_MS_KEY, 1000);
- initFlusher();
+ initFlusher(128 * 1024 * 1024, 1_000_000, TimeUnit.SECONDS.toMillis(1));
assertEquals(0, flushCalled.get());
Thread.sleep(1500);
assertEquals(1, flushCalled.get());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionOnTwoFileSystems.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionOnTwoFileSystems.java
new file mode 100644
index 00000000000..94cfe40303f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionOnTwoFileSystems.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.store;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestLocalRegionOnTwoFileSystems {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestLocalRegionOnTwoFileSystems.class);
+
+ private static final HBaseCommonTestingUtility HFILE_UTIL = new HBaseCommonTestingUtility();
+
+ private static final HBaseTestingUtility WAL_UTIL = new HBaseTestingUtility();
+
+ private static byte[] CF = Bytes.toBytes("f");
+
+ private static byte[] CQ = Bytes.toBytes("q");
+
+ private static String REGION_DIR_NAME = "local";
+
+ private static TableDescriptor TD =
+ TableDescriptorBuilder.newBuilder(TableName.valueOf("test:local"))
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build();
+
+ private static int COMPACT_MIN = 4;
+
+ private LocalRegion region;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ WAL_UTIL.startMiniCluster(3);
+ Configuration conf = HFILE_UTIL.getConfiguration();
+ conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
+ Path rootDir = HFILE_UTIL.getDataTestDir();
+ CommonFSUtils.setRootDir(conf, rootDir);
+ Path walRootDir = WAL_UTIL.getDataTestDirOnTestFS();
+ FileSystem walFs = WAL_UTIL.getTestFileSystem();
+ CommonFSUtils.setWALRootDir(conf,
+ walRootDir.makeQualified(walFs.getUri(), walFs.getWorkingDirectory()));
+
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ WAL_UTIL.shutdownMiniDFSCluster();
+ WAL_UTIL.cleanupTestDir();
+ HFILE_UTIL.cleanupTestDir();
+ }
+
+ private LocalRegion createLocalRegion(ServerName serverName) throws IOException {
+ Server server = mock(Server.class);
+ when(server.getConfiguration()).thenReturn(HFILE_UTIL.getConfiguration());
+ when(server.getServerName()).thenReturn(serverName);
+ LocalRegionParams params = new LocalRegionParams();
+ params.server(server).regionDirName(REGION_DIR_NAME).tableDescriptor(TD)
+ .flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000)
+ .flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(COMPACT_MIN).maxWals(32)
+ .useHsync(false).ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15))
+ .archivedWalSuffix(LocalStore.ARCHIVED_WAL_SUFFIX)
+ .archivedHFileSuffix(LocalStore.ARCHIVED_HFILE_SUFFIX);
+ return LocalRegion.create(params);
+ }
+
+ @Before
+ public void setUpBeforeTest() throws IOException {
+ Path rootDir = HFILE_UTIL.getDataTestDir();
+ FileSystem fs = rootDir.getFileSystem(HFILE_UTIL.getConfiguration());
+ fs.delete(rootDir, true);
+ Path walRootDir = WAL_UTIL.getDataTestDirOnTestFS();
+ FileSystem walFs = WAL_UTIL.getTestFileSystem();
+ walFs.delete(walRootDir, true);
+ region = createLocalRegion(ServerName.valueOf("localhost", 12345, System.currentTimeMillis()));
+ }
+
+ @After
+ public void tearDownAfterTest() {
+ region.close(true);
+ }
+
+ private int getStorefilesCount() {
+ return Iterables.getOnlyElement(region.region.getStores()).getStorefilesCount();
+ }
+
+ @Test
+ public void testFlushAndCompact() throws Exception {
+ for (int i = 0; i < COMPACT_MIN - 1; i++) {
+ final int index = i;
+ region
+ .update(r -> r.put(new Put(Bytes.toBytes(index)).addColumn(CF, CQ, Bytes.toBytes(index))));
+ region.flush(true);
+ }
+ region.requestRollAll();
+ region.waitUntilWalRollFinished();
+ region.update(r -> r.put(
+ new Put(Bytes.toBytes(COMPACT_MIN - 1)).addColumn(CF, CQ, Bytes.toBytes(COMPACT_MIN - 1))));
+ region.flusherAndCompactor.requestFlush();
+
+ HFILE_UTIL.waitFor(15000, () -> getStorefilesCount() == 1);
+
+ // make sure the archived hfiles are on the root fs
+ Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(
+ HFILE_UTIL.getDataTestDir(), region.region.getRegionInfo(), CF);
+ FileSystem rootFs = storeArchiveDir.getFileSystem(HFILE_UTIL.getConfiguration());
+ HFILE_UTIL.waitFor(15000, () -> {
+ try {
+ FileStatus[] fses = rootFs.listStatus(storeArchiveDir);
+ return fses != null && fses.length == COMPACT_MIN;
+ } catch (FileNotFoundException e) {
+ return false;
+ }
+ });
+
+ // make sure the archived wal files are on the wal fs
+ Path walArchiveDir = new Path(CommonFSUtils.getWALRootDir(HFILE_UTIL.getConfiguration()),
+ HConstants.HREGION_OLDLOGDIR_NAME);
+ HFILE_UTIL.waitFor(15000, () -> {
+ try {
+ FileStatus[] fses = WAL_UTIL.getTestFileSystem().listStatus(walArchiveDir);
+ return fses != null && fses.length == 1;
+ } catch (FileNotFoundException e) {
+ return false;
+ }
+ });
+ }
+
+ @Test
+ public void testRecovery() throws IOException {
+ int countPerRound = 100;
+ for (int round = 0; round < 5; round++) {
+ for (int i = 0; i < countPerRound; i++) {
+ int row = round * countPerRound + i;
+ Put put = new Put(Bytes.toBytes(row)).addColumn(CF, CQ, Bytes.toBytes(row));
+ region.update(r -> r.put(put));
+ }
+ region.close(true);
+ region = createLocalRegion(
+ ServerName.valueOf("localhost", 12345, System.currentTimeMillis() + round + 1));
+ try (RegionScanner scanner = region.getScanner(new Scan())) {
+ List cells = new ArrayList<>();
+ boolean moreValues = true;
+ for (int i = 0; i < (round + 1) * countPerRound; i++) {
+ assertTrue(moreValues);
+ moreValues = scanner.next(cells);
+ assertEquals(1, cells.size());
+ Result result = Result.create(cells);
+ cells.clear();
+ assertEquals(i, Bytes.toInt(result.getRow()));
+ assertEquals(i, Bytes.toInt(result.getValue(CF, CQ)));
+ }
+ assertFalse(moreValues);
+ }
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionWALCleaner.java
similarity index 56%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionWALCleaner.java
index b4290dad3fe..052b1e6f1d2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionWALCleaner.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.procedure2.store.region;
+package org.apache.hadoop.hbase.master.store;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -26,62 +26,39 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
-import org.apache.hadoop.hbase.master.cleaner.TimeToLiveProcedureWALCleaner;
-import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
-import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
+import org.apache.hadoop.hbase.master.cleaner.TimeToLiveMasterLocalStoreWALCleaner;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.hadoop.hbase.util.Bytes;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, MediumTests.class })
-public class TestRegionProcedureStoreWALCleaner {
+public class TestLocalRegionWALCleaner extends LocalRegionTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestRegionProcedureStoreWALCleaner.class);
+ HBaseClassTestRule.forClass(TestLocalRegionWALCleaner.class);
- private HBaseCommonTestingUtility htu;
-
- private FileSystem fs;
-
- private RegionProcedureStore store;
-
- private ChoreService choreService;
-
- private DirScanPool dirScanPool;
+ private static long TTL_MS = 5000;
private LogCleaner logCleaner;
private Path globalWALArchiveDir;
- @Before
- public void setUp() throws IOException {
- htu = new HBaseCommonTestingUtility();
+ @Override
+ protected void postSetUp() throws IOException {
Configuration conf = htu.getConfiguration();
- conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
- // Runs on local filesystem. Test does not need sync. Turn off checks.
- htu.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
+ conf.setLong(TimeToLiveMasterLocalStoreWALCleaner.TTL_CONF_KEY, TTL_MS);
Path testDir = htu.getDataTestDir();
- fs = testDir.getFileSystem(conf);
- CommonFSUtils.setWALRootDir(conf, testDir);
+ FileSystem fs = testDir.getFileSystem(conf);
globalWALArchiveDir = new Path(testDir, HConstants.HREGION_OLDLOGDIR_NAME);
- choreService = new ChoreService("Region-Procedure-Store");
- dirScanPool = new DirScanPool(conf);
- conf.setLong(TimeToLiveProcedureWALCleaner.TTL_CONF_KEY, 5000);
- conf.setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 1000);
logCleaner = new LogCleaner(1000, new Stoppable() {
private volatile boolean stopped = false;
@@ -95,30 +72,21 @@ public class TestRegionProcedureStoreWALCleaner {
public boolean isStopped() {
return stopped;
}
- }, conf, fs, globalWALArchiveDir, dirScanPool);
+ }, conf, fs, globalWALArchiveDir, cleanerPool);
choreService.scheduleChore(logCleaner);
- store = RegionProcedureStoreTestHelper.createStore(conf, choreService, dirScanPool,
- new LoadCounter());
- }
-
- @After
- public void tearDown() throws IOException {
- store.stop(true);
- logCleaner.cancel();
- dirScanPool.shutdownNow();
- choreService.shutdown();
- htu.cleanupTestDir();
}
@Test
public void test() throws IOException, InterruptedException {
- RegionProcedureStoreTestProcedure proc = new RegionProcedureStoreTestProcedure();
- store.insert(proc, null);
- store.region.flush(true);
+ region
+ .update(r -> r.put(new Put(Bytes.toBytes(1)).addColumn(CF1, QUALIFIER, Bytes.toBytes(1))));
+ region.flush(true);
+ Path testDir = htu.getDataTestDir();
+ FileSystem fs = testDir.getFileSystem(htu.getConfiguration());
// no archived wal files yet
assertFalse(fs.exists(globalWALArchiveDir));
- store.walRoller.requestRollAll();
- store.walRoller.waitUntilWalRollFinished();
+ region.requestRollAll();
+ region.waitUntilWalRollFinished();
// should have one
FileStatus[] files = fs.listStatus(globalWALArchiveDir);
assertEquals(1, files.length);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java
index fa8b1511b4f..2822fa9f9ef 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
-import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
+import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStorePerformanceEvaluation;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
@@ -47,19 +47,15 @@ public class RegionProcedureStorePerformanceEvaluation
private final ServerName serverName =
ServerName.valueOf("localhost", 12345, System.currentTimeMillis());
- private final ChoreService choreService;
-
private volatile boolean abort = false;
public MockServer(Configuration conf) {
this.conf = conf;
- this.choreService = new ChoreService("Cleaner-Chore-Service");
}
@Override
public void abort(String why, Throwable e) {
abort = true;
- choreService.shutdown();
}
@Override
@@ -69,7 +65,6 @@ public class RegionProcedureStorePerformanceEvaluation
@Override
public void stop(String why) {
- choreService.shutdown();
}
@Override
@@ -114,11 +109,11 @@ public class RegionProcedureStorePerformanceEvaluation
@Override
public ChoreService getChoreService() {
- return choreService;
+ throw new UnsupportedOperationException();
}
}
- private DirScanPool cleanerPool;
+ private LocalStore localStore;
@Override
protected RegionProcedureStore createProcedureStore(Path storeDir) throws IOException {
@@ -132,10 +127,11 @@ public class RegionProcedureStorePerformanceEvaluation
int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
initialCountPercentage, null);
- conf.setBoolean(RegionProcedureStore.USE_HSYNC_KEY, "hsync".equals(syncType));
+ conf.setBoolean(LocalStore.USE_HSYNC_KEY, "hsync".equals(syncType));
CommonFSUtils.setRootDir(conf, storeDir);
- cleanerPool = new DirScanPool(conf);
- return new RegionProcedureStore(new MockServer(conf), cleanerPool, (fs, apth) -> {
+ MockServer server = new MockServer(conf);
+ localStore = LocalStore.create(server);
+ return new RegionProcedureStore(server, localStore, (fs, apth) -> {
});
}
@@ -152,7 +148,7 @@ public class RegionProcedureStorePerformanceEvaluation
@Override
protected void postStop(RegionProcedureStore store) throws IOException {
- cleanerPool.shutdownNow();
+ localStore.close(true);
}
public static void main(String[] args) throws IOException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestBase.java
index dde04a4006b..3dd00d261fe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestBase.java
@@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.procedure2.store.region;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
-import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -30,42 +30,35 @@ import org.junit.After;
import org.junit.Before;
/**
- * This runs on local filesystem. hsync and hflush are not supported. May lose data!
- * Only use where data loss is not of consequence.
+ * This runs on local filesystem. hsync and hflush are not supported. May lose data! Only use where
+ * data loss is not of consequence.
*/
public class RegionProcedureStoreTestBase {
protected HBaseCommonTestingUtility htu;
+ protected LocalStore localStore;
+
protected RegionProcedureStore store;
- protected ChoreService choreService;
-
- protected DirScanPool cleanerPool;
-
- protected void configure(Configuration conf) {
- }
-
@Before
public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility();
- htu.getConfiguration().setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
+ Configuration conf = htu.getConfiguration();
+ conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
// Runs on local filesystem. Test does not need sync. Turn off checks.
- htu.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
- configure(htu.getConfiguration());
+ conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
Path testDir = htu.getDataTestDir();
- CommonFSUtils.setWALRootDir(htu.getConfiguration(), testDir);
- choreService = new ChoreService(getClass().getSimpleName());
- cleanerPool = new DirScanPool(htu.getConfiguration());
- store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), choreService,
- cleanerPool, new LoadCounter());
+ CommonFSUtils.setRootDir(htu.getConfiguration(), testDir);
+ Server server = RegionProcedureStoreTestHelper.mockServer(conf);
+ localStore = LocalStore.create(server);
+ store = RegionProcedureStoreTestHelper.createStore(server, localStore, new LoadCounter());
}
@After
public void tearDown() throws IOException {
store.stop(true);
- cleanerPool.shutdownNow();
- choreService.shutdown();
+ localStore.close(true);
htu.cleanupTestDir();
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java
index 5497b8a5439..396574ec784 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java
@@ -24,10 +24,9 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
+import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
@@ -36,14 +35,17 @@ final class RegionProcedureStoreTestHelper {
private RegionProcedureStoreTestHelper() {
}
- static RegionProcedureStore createStore(Configuration conf, ChoreService choreService,
- DirScanPool cleanerPool, ProcedureLoader loader) throws IOException {
+ static Server mockServer(Configuration conf) {
Server server = mock(Server.class);
when(server.getConfiguration()).thenReturn(conf);
when(server.getServerName())
.thenReturn(ServerName.valueOf("localhost", 12345, System.currentTimeMillis()));
- when(server.getChoreService()).thenReturn(choreService);
- RegionProcedureStore store = new RegionProcedureStore(server, cleanerPool, new LeaseRecovery() {
+ return server;
+ }
+
+ static RegionProcedureStore createStore(Server server, LocalStore localStore,
+ ProcedureLoader loader) throws IOException {
+ RegionProcedureStore store = new RegionProcedureStore(server, localStore, new LeaseRecovery() {
@Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestHFileProcedurePrettyPrinter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestHFileProcedurePrettyPrinter.java
index 27d20a58d57..684da19c161 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestHFileProcedurePrettyPrinter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestHFileProcedurePrettyPrinter.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -93,16 +94,15 @@ public class TestHFileProcedurePrettyPrinter extends RegionProcedureStoreTestBas
store.insert(proc, null);
procs.add(proc);
}
- store.region.flush(true);
+ store.localStore.flush(true);
for (int i = 0; i < 5; i++) {
store.delete(procs.get(i).getProcId());
}
- store.region.flush(true);
+ store.localStore.flush(true);
store.cleanup();
- store.region.flush(true);
+ store.localStore.flush(true);
Path tableDir = CommonFSUtils.getTableDir(
- new Path(htu.getDataTestDir(), RegionProcedureStore.MASTER_PROCEDURE_DIR),
- RegionProcedureStore.TABLE_NAME);
+ new Path(htu.getDataTestDir(), LocalStore.MASTER_STORE_DIR), LocalStore.TABLE_NAME);
FileSystem fs = tableDir.getFileSystem(htu.getConfiguration());
Path regionDir =
fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
index 178e78d23f0..1f4ceb43596 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
@@ -126,24 +126,24 @@ public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
assertEquals(1, loader.getRunnableCount());
// the row should still be there
- assertTrue(store.region
+ assertTrue(store.localStore
.get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
- assertTrue(store.region
+ assertTrue(store.localStore
.get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
// proc2 will be deleted after cleanup, but proc3 should still be there as it holds the max proc
// id
store.cleanup();
- assertTrue(store.region
+ assertTrue(store.localStore
.get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
- assertFalse(store.region
+ assertFalse(store.localStore
.get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
store.insert(proc4, null);
store.cleanup();
// proc3 should also be deleted as now proc4 holds the max proc id
- assertFalse(store.region
+ assertFalse(store.localStore
.get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
}
@@ -227,7 +227,7 @@ public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
@Override
public void setResponse(Message param, CellScanner cells, Throwable errorThrowable,
- String error) {
+ String error) {
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreCompaction.java
deleted file mode 100644
index 15682bb8cff..00000000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreCompaction.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.procedure2.store.region;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
-import org.apache.hadoop.hbase.testclassification.MasterTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.HFileArchiveUtil;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
-
-@Category({ MasterTests.class, MediumTests.class })
-public class TestRegionProcedureStoreCompaction extends RegionProcedureStoreTestBase {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestRegionProcedureStoreCompaction.class);
-
- private int compactMin = 4;
-
- @Override
- protected void configure(Configuration conf) {
- conf.setInt(RegionFlusherAndCompactor.COMPACT_MIN_KEY, compactMin);
- conf.setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 500);
- conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 5000);
- }
-
- private int getStorefilesCount() {
- return Iterables.getOnlyElement(store.region.getStores()).getStorefilesCount();
- }
-
- @Test
- public void test() throws IOException, InterruptedException {
- for (int i = 0; i < compactMin - 1; i++) {
- store.insert(new RegionProcedureStoreTestProcedure(), null);
- store.region.flush(true);
- }
- assertEquals(compactMin - 1, getStorefilesCount());
- store.insert(new RegionProcedureStoreTestProcedure(), null);
- store.flusherAndCompactor.requestFlush();
- htu.waitFor(15000, () -> getStorefilesCount() == 1);
- Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(
- new Path(htu.getDataTestDir(), RegionProcedureStore.MASTER_PROCEDURE_DIR),
- store.region.getRegionInfo(), RegionProcedureStore.FAMILY);
- FileSystem fs = storeArchiveDir.getFileSystem(htu.getConfiguration());
- // after compaction, the old hfiles should have been compacted
- htu.waitFor(15000, () -> {
- try {
- FileStatus[] fses = fs.listStatus(storeArchiveDir);
- return fses != null && fses.length == compactMin;
- } catch (FileNotFoundException e) {
- return false;
- }
- });
- // ttl has not expired, so should not delete any files
- Thread.sleep(1000);
- FileStatus[] compactedHFiles = fs.listStatus(storeArchiveDir);
- assertEquals(4, compactedHFiles.length);
- Thread.sleep(2000);
- // touch one file
- long currentTime = System.currentTimeMillis();
- fs.setTimes(compactedHFiles[0].getPath(), currentTime, currentTime);
- Thread.sleep(3000);
- // only the touched file is still there after clean up
- FileStatus[] remainingHFiles = fs.listStatus(storeArchiveDir);
- assertEquals(1, remainingHFiles.length);
- assertEquals(compactedHFiles[0].getPath(), remainingHFiles[0].getPath());
- Thread.sleep(6000);
- // the touched file should also be cleaned up and then the cleaner will delete the parent
- // directory since it is empty.
- assertFalse(fs.exists(storeArchiveDir));
- }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java
index 2512d0f284b..effa751e21b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java
@@ -32,14 +32,14 @@ import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
-import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
+import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
@@ -65,14 +65,14 @@ public class TestRegionProcedureStoreMigration {
private HBaseCommonTestingUtility htu;
+ private Server server;
+
+ private LocalStore localStore;
+
private RegionProcedureStore store;
private WALProcedureStore walStore;
- private ChoreService choreService;
-
- private DirScanPool cleanerPool;
-
@Before
public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility();
@@ -81,7 +81,7 @@ public class TestRegionProcedureStoreMigration {
// Runs on local filesystem. Test does not need sync. Turn off checks.
htu.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
Path testDir = htu.getDataTestDir();
- CommonFSUtils.setWALRootDir(conf, testDir);
+ CommonFSUtils.setRootDir(conf, testDir);
walStore = new WALProcedureStore(conf, new LeaseRecovery() {
@Override
@@ -91,8 +91,8 @@ public class TestRegionProcedureStoreMigration {
walStore.start(1);
walStore.recoverLease();
walStore.load(new LoadCounter());
- choreService = new ChoreService(getClass().getSimpleName());
- cleanerPool = new DirScanPool(htu.getConfiguration());
+ server = RegionProcedureStoreTestHelper.mockServer(conf);
+ localStore = LocalStore.create(server);
}
@After
@@ -100,9 +100,8 @@ public class TestRegionProcedureStoreMigration {
if (store != null) {
store.stop(true);
}
+ localStore.close(true);
walStore.stop(true);
- cleanerPool.shutdownNow();
- choreService.shutdown();
htu.cleanupTestDir();
}
@@ -121,30 +120,29 @@ public class TestRegionProcedureStoreMigration {
SortedSet loadedProcs =
new TreeSet<>((p1, p2) -> Long.compare(p1.getProcId(), p2.getProcId()));
MutableLong maxProcIdSet = new MutableLong(0);
- store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), choreService,
- cleanerPool, new ProcedureLoader() {
+ store = RegionProcedureStoreTestHelper.createStore(server, localStore, new ProcedureLoader() {
- @Override
- public void setMaxProcId(long maxProcId) {
- maxProcIdSet.setValue(maxProcId);
- }
+ @Override
+ public void setMaxProcId(long maxProcId) {
+ maxProcIdSet.setValue(maxProcId);
+ }
- @Override
- public void load(ProcedureIterator procIter) throws IOException {
- while (procIter.hasNext()) {
- RegionProcedureStoreTestProcedure proc =
- (RegionProcedureStoreTestProcedure) procIter.next();
- loadedProcs.add(proc);
- }
+ @Override
+ public void load(ProcedureIterator procIter) throws IOException {
+ while (procIter.hasNext()) {
+ RegionProcedureStoreTestProcedure proc =
+ (RegionProcedureStoreTestProcedure) procIter.next();
+ loadedProcs.add(proc);
}
+ }
- @Override
- public void handleCorrupted(ProcedureIterator procIter) throws IOException {
- if (procIter.hasNext()) {
- fail("Found corrupted procedures");
- }
+ @Override
+ public void handleCorrupted(ProcedureIterator procIter) throws IOException {
+ if (procIter.hasNext()) {
+ fail("Found corrupted procedures");
}
- });
+ }
+ });
assertEquals(10, maxProcIdSet.longValue());
assertEquals(5, loadedProcs.size());
int procId = 1;
@@ -168,8 +166,7 @@ public class TestRegionProcedureStoreMigration {
walStore.stop(true);
try {
- store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), choreService,
- cleanerPool, new LoadCounter());
+ store = RegionProcedureStoreTestHelper.createStore(server, localStore, new LoadCounter());
fail("Should fail since AssignProcedure is not supported");
} catch (HBaseIOException e) {
assertThat(e.getMessage(), startsWith("Unsupported"));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestWALProcedurePrettyPrinter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestWALProcedurePrettyPrinter.java
index 6ad16c985c9..3fde2c04be4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestWALProcedurePrettyPrinter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestWALProcedurePrettyPrinter.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.util.ToolRunner;
@@ -57,18 +58,18 @@ public class TestWALProcedurePrettyPrinter extends RegionProcedureStoreTestBase
store.insert(proc, null);
procs.add(proc);
}
- store.region.flush(true);
+ store.localStore.flush(true);
for (int i = 0; i < 5; i++) {
store.delete(procs.get(i).getProcId());
}
store.cleanup();
Path walParentDir = new Path(htu.getDataTestDir(),
- RegionProcedureStore.MASTER_PROCEDURE_DIR + "/" + HConstants.HREGION_LOGDIR_NAME);
+ LocalStore.MASTER_STORE_DIR + "/" + HConstants.HREGION_LOGDIR_NAME);
FileSystem fs = walParentDir.getFileSystem(htu.getConfiguration());
Path walDir = fs.listStatus(walParentDir)[0].getPath();
Path walFile = fs.listStatus(walDir)[0].getPath();
- store.walRoller.requestRollAll();
- store.walRoller.waitUntilWalRollFinished();
+ store.localStore.requestRollAll();
+ store.localStore.waitUntilWalRollFinished();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
PrintStream out = new PrintStream(bos);
WALProcedurePrettyPrinter printer = new WALProcedurePrettyPrinter(out);
| | | |