diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index b89488a60df..e4c4eb33064 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -678,6 +678,10 @@ public abstract class AbstractFSWAL implements WAL { // NewPath could be equal to oldPath if replaceWriter fails. newPath = replaceWriter(oldPath, newPath, nextWriter); tellListenersAboutPostLogRoll(oldPath, newPath); + if (LOG.isDebugEnabled()) { + LOG.debug("Create new " + getClass().getSimpleName() + " writer with pipeline: " + + Arrays.toString(getPipeline())); + } // Can we delete any of the old log files? if (getNumRolledLogFiles() > 0) { cleanOldLogs(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 2f5c299ef96..e495e99d57d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -82,7 +82,7 @@ public abstract class AbstractFSWALProvider> implemen * @param factory factory that made us, identity used for FS layout. may not be null * @param conf may not be null * @param listeners may be null - * @param providerId differentiate between providers from one facotry, used for FS layout. may be + * @param providerId differentiate between providers from one factory, used for FS layout. may be * null */ @Override @@ -109,7 +109,7 @@ public abstract class AbstractFSWALProvider> implemen } @Override - public WAL getWAL(byte[] identifier, byte[] namespace) throws IOException { + public T getWAL(byte[] identifier, byte[] namespace) throws IOException { T walCopy = wal; if (walCopy == null) { // only lock when need to create wal, and need to lock since diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java index 0aeaccf2d86..b447e94b0d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java @@ -18,34 +18,31 @@ */ package org.apache.hadoop.hbase.wal; -import static org.apache.hadoop.hbase.wal.DefaultWALProvider.META_WAL_PROVIDER_ID; -import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; import java.io.IOException; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.regionserver.wal.FSHLog; -import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; - +import org.apache.hadoop.hbase.classification.InterfaceAudience; // imports for classes still in regionserver.wal import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.IdReadWriteLock; /** * A WAL Provider that returns a WAL per group of regions. * + * This provider follows the decorator pattern and mainly holds the logic for WAL grouping. + * WAL creation/roll/close is delegated to {@link #DELEGATE_PROVIDER} + * * Region grouping is handled via {@link RegionGroupingStrategy} and can be configured via the * property "hbase.wal.regiongrouping.strategy". Current strategy choices are *
    @@ -57,7 +54,7 @@ import org.apache.hadoop.hbase.util.FSUtils; * Optionally, a FQCN to a custom implementation may be given. */ @InterfaceAudience.Private -class RegionGroupingProvider implements WALProvider { +public class RegionGroupingProvider implements WALProvider { private static final Log LOG = LogFactory.getLog(RegionGroupingProvider.class); /** @@ -124,22 +121,23 @@ class RegionGroupingProvider implements WALProvider { public static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy"; public static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name(); + /** delegate provider for WAL creation/roll/close */ + public static final String DELEGATE_PROVIDER = "hbase.wal.regiongrouping.delegate.provider"; + public static final String DEFAULT_DELEGATE_PROVIDER = WALFactory.Providers.defaultProvider + .name(); + private static final String META_WAL_GROUP_NAME = "meta"; - /** A group-wal mapping, recommended to make sure one-one rather than many-one mapping */ - protected final Map cached = new HashMap(); - /** Stores unique wals generated by this RegionGroupingProvider */ - private final Set logs = Collections.synchronizedSet(new HashSet()); + /** A group-provider mapping, make sure one-one rather than many-one mapping */ + private final ConcurrentMap cached = new ConcurrentHashMap<>(); - /** - * we synchronize on walCacheLock to prevent wal recreation in different threads - */ - final Object walCacheLock = new Object(); + private final IdReadWriteLock createLock = new IdReadWriteLock(); - protected RegionGroupingStrategy strategy = null; + private RegionGroupingStrategy strategy = null; + private WALFactory factory = null; private List listeners = null; private String providerId = null; - private Configuration conf = null; + private Class providerClass; @Override public void init(final WALFactory factory, final Configuration conf, @@ -147,6 +145,7 @@ class RegionGroupingProvider implements WALProvider { if (null != strategy) { throw new IllegalStateException("WALProvider.init should only be called once."); } + this.factory = factory; this.listeners = null == listeners ? null : Collections.unmodifiableList(listeners); StringBuilder sb = new StringBuilder().append(factory.factoryId); if (providerId != null) { @@ -158,45 +157,33 @@ class RegionGroupingProvider implements WALProvider { } this.providerId = sb.toString(); this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY); - this.conf = conf; + this.providerClass = factory.getProviderClass(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER); } - /** - * Populate the cache for this group. - */ - FSHLog populateCache(String groupName) throws IOException { - boolean isMeta = META_WAL_PROVIDER_ID.equals(providerId); - String hlogPrefix; - List listeners; - if (isMeta) { - hlogPrefix = this.providerId; - // don't watch log roll for meta - listeners = Collections. singletonList(new MetricsWAL()); + private WALProvider createProvider(String group) throws IOException { + if (META_WAL_PROVIDER_ID.equals(providerId)) { + return factory.createProvider(providerClass, listeners, META_WAL_PROVIDER_ID); } else { - hlogPrefix = groupName; - listeners = this.listeners; + return factory.createProvider(providerClass, listeners, group); } - FSHLog log = new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf), - DefaultWALProvider.getWALDirectoryName(providerId), HConstants.HREGION_OLDLOGDIR_NAME, - conf, listeners, true, hlogPrefix, isMeta ? META_WAL_PROVIDER_ID : null); - cached.put(groupName, log); - logs.add(log); - return log; } private WAL getWAL(final String group) throws IOException { - WAL log = cached.get(group); - if (null == log) { - // only lock when need to create wal, and need to lock since - // creating hlog on fs is time consuming - synchronized (this.walCacheLock) { - log = cached.get(group);// check again - if (null == log) { - log = populateCache(group); + WALProvider provider = cached.get(group); + if (provider == null) { + Lock lock = createLock.getLock(group.hashCode()).writeLock(); + lock.lock(); + try { + provider = cached.get(group); + if (provider == null) { + provider = createProvider(group); + cached.put(group, provider); } + } finally { + lock.unlock(); } } - return log; + return provider.getWAL(null, null); } @Override @@ -214,15 +201,15 @@ class RegionGroupingProvider implements WALProvider { public void shutdown() throws IOException { // save the last exception and rethrow IOException failure = null; - synchronized (logs) { - for (FSHLog wal : logs) { - try { - wal.shutdown(); - } catch (IOException exception) { - LOG.error("Problem shutting down log '" + wal + "': " + exception.getMessage()); - LOG.debug("Details of problem shutting down log '" + wal + "'", exception); - failure = exception; + for (WALProvider provider: cached.values()) { + try { + provider.shutdown(); + } catch (IOException e) { + LOG.error("Problem shutting down wal provider '" + provider + "': " + e.getMessage()); + if (LOG.isDebugEnabled()) { + LOG.debug("Details of problem shutting down wal provider '" + provider + "'", e); } + failure = e; } } if (failure != null) { @@ -234,15 +221,15 @@ class RegionGroupingProvider implements WALProvider { public void close() throws IOException { // save the last exception and rethrow IOException failure = null; - synchronized (logs) { - for (FSHLog wal : logs) { - try { - wal.close(); - } catch (IOException exception) { - LOG.error("Problem closing log '" + wal + "': " + exception.getMessage()); - LOG.debug("Details of problem closing wal '" + wal + "'", exception); - failure = exception; + for (WALProvider provider : cached.values()) { + try { + provider.close(); + } catch (IOException e) { + LOG.error("Problem closing wal provider '" + provider + "': " + e.getMessage()); + if (LOG.isDebugEnabled()) { + LOG.debug("Details of problem closing wal provider '" + provider + "'", e); } + failure = e; } } if (failure != null) { @@ -262,10 +249,8 @@ class RegionGroupingProvider implements WALProvider { @Override public long getNumLogFiles() { long numLogFiles = 0; - synchronized (logs) { - for (FSHLog wal : logs) { - numLogFiles += wal.getNumLogFiles(); - } + for (WALProvider provider : cached.values()) { + numLogFiles += provider.getNumLogFiles(); } return numLogFiles; } @@ -273,12 +258,9 @@ class RegionGroupingProvider implements WALProvider { @Override public long getLogFileSize() { long logFileSize = 0; - synchronized (logs) { - for (FSHLog wal : logs) { - logFileSize += wal.getLogFileSize(); - } + for (WALProvider provider : cached.values()) { + logFileSize += provider.getLogFileSize(); } return logFileSize; } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index a2761df01c3..a9c17b5cba6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -127,37 +127,45 @@ public class WALFactory { factoryId = SINGLETON_ID; } - /** - * instantiate a provider from a config property. - * requires conf to have already been set (as well as anything the provider might need to read). - */ - WALProvider getProvider(final String key, final String defaultValue, - final List listeners, final String providerId) throws IOException { - Class clazz; + Class getProviderClass(String key, String defaultValue) { try { - clazz = Providers.valueOf(conf.get(key, defaultValue)).clazz; + return Providers.valueOf(conf.get(key, defaultValue)).clazz; } catch (IllegalArgumentException exception) { // Fall back to them specifying a class name // Note that the passed default class shouldn't actually be used, since the above only fails // when there is a config value present. - clazz = conf.getClass(key, DefaultWALProvider.class, WALProvider.class); + return conf.getClass(key, DefaultWALProvider.class, WALProvider.class); } + } + + WALProvider createProvider(Class clazz, + List listeners, String providerId) throws IOException { LOG.info("Instantiating WALProvider of type " + clazz); try { final WALProvider result = clazz.newInstance(); result.init(this, conf, listeners, providerId); return result; } catch (InstantiationException exception) { - LOG.error("couldn't set up WALProvider, check config key " + key); + LOG.error("couldn't set up WALProvider, the configured class is " + clazz); LOG.debug("Exception details for failure to load WALProvider.", exception); throw new IOException("couldn't set up WALProvider", exception); } catch (IllegalAccessException exception) { - LOG.error("couldn't set up WALProvider, check config key " + key); + LOG.error("couldn't set up WALProvider, the configured class is " + clazz); LOG.debug("Exception details for failure to load WALProvider.", exception); throw new IOException("couldn't set up WALProvider", exception); } } + /** + * instantiate a provider from a config property. + * requires conf to have already been set (as well as anything the provider might need to read). + */ + WALProvider getProvider(final String key, final String defaultValue, + final List listeners, final String providerId) throws IOException { + Class clazz = getProviderClass(key, defaultValue); + return createProvider(clazz, listeners, providerId); + } + /** * @param conf must not be null, will keep a reference to read params in later reader/writer * instances. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleAsyncWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleAsyncWAL.java new file mode 100644 index 00000000000..debe8a1de4e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleAsyncWAL.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.multiwal; + +import org.apache.hadoop.hbase.replication.TestReplicationEndpoint; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.wal.RegionGroupingProvider; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestReplicationEndpointWithMultipleAsyncWAL extends TestReplicationEndpoint { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); + conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs"); + TestReplicationEndpoint.setUpBeforeClass(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java index cd7873a25ff..8f350f297a6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.multiwal; import org.apache.hadoop.hbase.replication.TestReplicationEndpoint; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.wal.RegionGroupingProvider; import org.apache.hadoop.hbase.wal.WALFactory; import org.junit.BeforeClass; import org.junit.experimental.categories.Category; @@ -29,6 +30,7 @@ public class TestReplicationEndpointWithMultipleWAL extends TestReplicationEndpo @BeforeClass public static void setUpBeforeClass() throws Exception { conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); + conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem"); TestReplicationEndpoint.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL.java new file mode 100644 index 00000000000..416cda7f2f6 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.multiwal; + +import org.apache.hadoop.hbase.replication.TestReplicationKillMasterRSCompressed; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.wal.RegionGroupingProvider; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({ReplicationTests.class, LargeTests.class}) +public class TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL extends + TestReplicationKillMasterRSCompressed { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); + conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs"); + TestReplicationKillMasterRSCompressed.setUpBeforeClass(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java index 0b2c2ccac2a..18067762cc0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.multiwal; import org.apache.hadoop.hbase.replication.TestReplicationKillMasterRSCompressed; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.wal.RegionGroupingProvider; import org.apache.hadoop.hbase.wal.WALFactory; import org.junit.BeforeClass; import org.junit.experimental.categories.Category; @@ -30,6 +31,7 @@ public class TestReplicationKillMasterRSCompressedWithMultipleWAL extends @BeforeClass public static void setUpBeforeClass() throws Exception { conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); + conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem"); TestReplicationKillMasterRSCompressed.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java new file mode 100644 index 00000000000..bb91aaf7d17 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.multiwal; + +import org.apache.hadoop.hbase.replication.TestReplicationBase; +import org.apache.hadoop.hbase.replication.TestReplicationSyncUpTool; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.wal.RegionGroupingProvider; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestReplicationSyncUpToolWithMultipleAsyncWAL extends TestReplicationSyncUpTool { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); + conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs"); + TestReplicationBase.setUpBeforeClass(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java index a8e455cab9e..7329a902e9e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.replication.TestReplicationBase; import org.apache.hadoop.hbase.replication.TestReplicationSyncUpTool; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.wal.RegionGroupingProvider; import org.apache.hadoop.hbase.wal.WALFactory; import org.junit.BeforeClass; import org.junit.experimental.categories.Category; @@ -30,6 +31,7 @@ public class TestReplicationSyncUpToolWithMultipleWAL extends TestReplicationSyn @BeforeClass public static void setUpBeforeClass() throws Exception { conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); + conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem"); TestReplicationBase.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java index 2044f82e860..8523e6923ad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java @@ -18,80 +18,92 @@ */ package org.apache.hadoop.hbase.wal; +import static org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.DEFAULT_NUM_REGION_GROUPS; +import static org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.NUM_REGION_GROUPS; +import static org.apache.hadoop.hbase.wal.RegionGroupingProvider.*; +import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER; +import static org.junit.Assert.assertEquals; + import java.io.IOException; +import java.util.Arrays; import java.util.HashSet; import java.util.Random; import java.util.Set; -import static org.junit.Assert.assertEquals; -import static org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.NUM_REGION_GROUPS; -import static org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.DEFAULT_NUM_REGION_GROUPS; -import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER; -import static org.apache.hadoop.hbase.wal.RegionGroupingProvider.REGION_GROUPING_STRATEGY; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; -@Category({RegionServerTests.class, LargeTests.class}) +@RunWith(Parameterized.class) +@Category({ RegionServerTests.class, LargeTests.class }) public class TestBoundedRegionGroupingStrategy { - protected static final Log LOG = LogFactory.getLog(TestBoundedRegionGroupingStrategy.class); + private static final Log LOG = LogFactory.getLog(TestBoundedRegionGroupingStrategy.class); - @Rule - public TestName currentTest = new TestName(); - protected static Configuration conf; - protected static FileSystem fs; - protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static Configuration CONF; + private static DistributedFileSystem FS; + + @Parameter + public String walProvider; + + @Parameters(name = "{index}: delegate-provider={0}") + public static Iterable data() { + return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] { "asyncfs" }); + } @Before public void setUp() throws Exception { - FileStatus[] entries = fs.listStatus(new Path("/")); - for (FileStatus dir : entries) { - fs.delete(dir.getPath(), true); - } + CONF.set(DELEGATE_PROVIDER, walProvider); } @After public void tearDown() throws Exception { + FileStatus[] entries = FS.listStatus(new Path("/")); + for (FileStatus dir : entries) { + FS.delete(dir.getPath(), true); + } } @BeforeClass public static void setUpBeforeClass() throws Exception { - conf = TEST_UTIL.getConfiguration(); + CONF = TEST_UTIL.getConfiguration(); // Make block sizes small. - conf.setInt("dfs.blocksize", 1024 * 1024); + CONF.setInt("dfs.blocksize", 1024 * 1024); // quicker heartbeat interval for faster DN death notification - conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000); - conf.setInt("dfs.heartbeat.interval", 1); - conf.setInt("dfs.client.socket-timeout", 5000); + CONF.setInt("dfs.namenode.heartbeat.recheck-interval", 5000); + CONF.setInt("dfs.heartbeat.interval", 1); + CONF.setInt("dfs.client.socket-timeout", 5000); // faster failover with cluster.shutdown();fs.close() idiom - conf.setInt("hbase.ipc.client.connect.max.retries", 1); - conf.setInt("dfs.client.block.recovery.retries", 1); - conf.setInt("hbase.ipc.client.connection.maxidletime", 500); + CONF.setInt("hbase.ipc.client.connect.max.retries", 1); + CONF.setInt("dfs.client.block.recovery.retries", 1); + CONF.setInt("hbase.ipc.client.connection.maxidletime", 500); - conf.setClass(WAL_PROVIDER, RegionGroupingProvider.class, WALProvider.class); - conf.set(REGION_GROUPING_STRATEGY, RegionGroupingProvider.Strategies.bounded.name()); + CONF.setClass(WAL_PROVIDER, RegionGroupingProvider.class, WALProvider.class); + CONF.set(REGION_GROUPING_STRATEGY, RegionGroupingProvider.Strategies.bounded.name()); TEST_UTIL.startMiniDFSCluster(3); - fs = TEST_UTIL.getDFSCluster().getFileSystem(); + FS = TEST_UTIL.getDFSCluster().getFileSystem(); } @AfterClass @@ -106,8 +118,8 @@ public class TestBoundedRegionGroupingStrategy { public void testConcurrentWrites() throws Exception { // Run the WPE tool with three threads writing 3000 edits each concurrently. // When done, verify that all edits were written. - int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf), - new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"}); + int errCode = WALPerformanceEvaluation.innerMain(new Configuration(CONF), + new String[] { "-threads", "3", "-verify", "-noclosefs", "-iterations", "3000" }); assertEquals(0, errCode); } @@ -117,39 +129,39 @@ public class TestBoundedRegionGroupingStrategy { @Test public void testMoreRegionsThanBound() throws Exception { final String parallelism = Integer.toString(DEFAULT_NUM_REGION_GROUPS * 2); - int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf), - new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000", - "-regions", parallelism}); + int errCode = WALPerformanceEvaluation.innerMain(new Configuration(CONF), + new String[] { "-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000", + "-regions", parallelism }); assertEquals(0, errCode); } @Test public void testBoundsGreaterThanDefault() throws Exception { - final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS); + final int temp = CONF.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS); try { - conf.setInt(NUM_REGION_GROUPS, temp*4); - final String parallelism = Integer.toString(temp*4); - int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf), - new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000", - "-regions", parallelism}); + CONF.setInt(NUM_REGION_GROUPS, temp * 4); + final String parallelism = Integer.toString(temp * 4); + int errCode = WALPerformanceEvaluation.innerMain(new Configuration(CONF), + new String[] { "-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000", + "-regions", parallelism }); assertEquals(0, errCode); } finally { - conf.setInt(NUM_REGION_GROUPS, temp); + CONF.setInt(NUM_REGION_GROUPS, temp); } } @Test public void testMoreRegionsThanBoundWithBoundsGreaterThanDefault() throws Exception { - final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS); + final int temp = CONF.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS); try { - conf.setInt(NUM_REGION_GROUPS, temp*4); - final String parallelism = Integer.toString(temp*4*2); - int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf), - new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000", - "-regions", parallelism}); + CONF.setInt(NUM_REGION_GROUPS, temp * 4); + final String parallelism = Integer.toString(temp * 4 * 2); + int errCode = WALPerformanceEvaluation.innerMain(new Configuration(CONF), + new String[] { "-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000", + "-regions", parallelism }); assertEquals(0, errCode); } finally { - conf.setInt(NUM_REGION_GROUPS, temp); + CONF.setInt(NUM_REGION_GROUPS, temp); } } @@ -158,32 +170,33 @@ public class TestBoundedRegionGroupingStrategy { */ @Test public void setMembershipDedups() throws IOException { - final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS); + final int temp = CONF.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS); WALFactory wals = null; try { - conf.setInt(NUM_REGION_GROUPS, temp*4); + CONF.setInt(NUM_REGION_GROUPS, temp * 4); // Set HDFS root directory for storing WAL - FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDirOnTestFS()); + FSUtils.setRootDir(CONF, TEST_UTIL.getDataTestDirOnTestFS()); - wals = new WALFactory(conf, null, currentTest.getMethodName()); - final Set seen = new HashSet(temp*4); + wals = new WALFactory(CONF, null, "setMembershipDedups"); + final Set seen = new HashSet(temp * 4); final Random random = new Random(); int count = 0; // we know that this should see one of the wals more than once - for (int i = 0; i < temp*8; i++) { + for (int i = 0; i < temp * 8; i++) { final WAL maybeNewWAL = wals.getWAL(Bytes.toBytes(random.nextInt()), null); LOG.info("Iteration " + i + ", checking wal " + maybeNewWAL); if (seen.add(maybeNewWAL)) { count++; } } - assertEquals("received back a different number of WALs that are not equal() to each other " + - "than the bound we placed.", temp*4, count); + assertEquals("received back a different number of WALs that are not equal() to each other " + + "than the bound we placed.", + temp * 4, count); } finally { if (wals != null) { wals.close(); } - conf.setInt(NUM_REGION_GROUPS, temp); + CONF.setInt(NUM_REGION_GROUPS, temp); } } }