HDFS-5167. Merge change r1574603 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1574608 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a2e3fab710
commit
d5da583bda
|
@ -24,6 +24,7 @@ import java.util.UUID;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.ipc.metrics.RetryCacheMetrics;
|
||||
import org.apache.hadoop.util.LightWeightCache;
|
||||
import org.apache.hadoop.util.LightWeightGSet;
|
||||
import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
|
||||
|
@ -43,6 +44,8 @@ import com.google.common.base.Preconditions;
|
|||
@InterfaceAudience.Private
|
||||
public class RetryCache {
|
||||
public static final Log LOG = LogFactory.getLog(RetryCache.class);
|
||||
private final RetryCacheMetrics retryCacheMetrics;
|
||||
|
||||
/**
|
||||
* CacheEntry is tracked using unique client ID and callId of the RPC request
|
||||
*/
|
||||
|
@ -178,6 +181,7 @@ public class RetryCache {
|
|||
|
||||
private final LightWeightGSet<CacheEntry, CacheEntry> set;
|
||||
private final long expirationTime;
|
||||
private String cacheName;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
|
@ -191,6 +195,8 @@ public class RetryCache {
|
|||
this.set = new LightWeightCache<CacheEntry, CacheEntry>(capacity, capacity,
|
||||
expirationTime, 0);
|
||||
this.expirationTime = expirationTime;
|
||||
this.cacheName = cacheName;
|
||||
this.retryCacheMetrics = RetryCacheMetrics.create(this);
|
||||
}
|
||||
|
||||
private static boolean skipRetryCache() {
|
||||
|
@ -199,12 +205,29 @@ public class RetryCache {
|
|||
return !Server.isRpcInvocation() || Server.getCallId() < 0
|
||||
|| Arrays.equals(Server.getClientId(), RpcConstants.DUMMY_CLIENT_ID);
|
||||
}
|
||||
|
||||
|
||||
|
||||
private void incrCacheClearedCounter() {
|
||||
retryCacheMetrics.incrCacheCleared();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public LightWeightGSet<CacheEntry, CacheEntry> getCacheSet() {
|
||||
return set;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public RetryCacheMetrics getMetricsForTests() {
|
||||
return retryCacheMetrics;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method returns cache name for metrics.
|
||||
*/
|
||||
public String getCacheName() {
|
||||
return cacheName;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method handles the following conditions:
|
||||
* <ul>
|
||||
|
@ -234,7 +257,10 @@ public class RetryCache {
|
|||
+ newEntry.callId + " to retryCache");
|
||||
}
|
||||
set.put(newEntry);
|
||||
retryCacheMetrics.incrCacheUpdated();
|
||||
return newEntry;
|
||||
} else {
|
||||
retryCacheMetrics.incrCacheHit();
|
||||
}
|
||||
}
|
||||
// Entry already exists in cache. Wait for completion and return its state
|
||||
|
@ -269,6 +295,7 @@ public class RetryCache {
|
|||
synchronized(this) {
|
||||
set.put(newEntry);
|
||||
}
|
||||
retryCacheMetrics.incrCacheUpdated();
|
||||
}
|
||||
|
||||
public void addCacheEntryWithPayload(byte[] clientId, int callId,
|
||||
|
@ -279,6 +306,7 @@ public class RetryCache {
|
|||
synchronized(this) {
|
||||
set.put(newEntry);
|
||||
}
|
||||
retryCacheMetrics.incrCacheUpdated();
|
||||
}
|
||||
|
||||
private static CacheEntry newEntry(long expirationTime) {
|
||||
|
@ -330,6 +358,7 @@ public class RetryCache {
|
|||
public static void clear(RetryCache cache) {
|
||||
if (cache != null) {
|
||||
cache.set.clear();
|
||||
cache.incrCacheClearedCounter();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ipc.metrics;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.ipc.RetryCache;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
|
||||
/**
|
||||
* This class is for maintaining the various RetryCache-related statistics
|
||||
* and publishing them through the metrics interfaces.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@Metrics(about="Aggregate RetryCache metrics", context="rpc")
|
||||
public class RetryCacheMetrics {
|
||||
|
||||
static final Log LOG = LogFactory.getLog(RetryCacheMetrics.class);
|
||||
final MetricsRegistry registry;
|
||||
final String name;
|
||||
|
||||
RetryCacheMetrics(RetryCache retryCache) {
|
||||
name = "RetryCache/"+ retryCache.getCacheName();
|
||||
registry = new MetricsRegistry(name);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Initialized "+ registry);
|
||||
}
|
||||
}
|
||||
|
||||
public String getName() { return name; }
|
||||
|
||||
public static RetryCacheMetrics create(RetryCache cache) {
|
||||
RetryCacheMetrics m = new RetryCacheMetrics(cache);
|
||||
return DefaultMetricsSystem.instance().register(m.name, null, m);
|
||||
}
|
||||
|
||||
@Metric("Number of RetryCache hit") MutableCounterLong cacheHit;
|
||||
@Metric("Number of RetryCache cleared") MutableCounterLong cacheCleared;
|
||||
@Metric("Number of RetryCache updated") MutableCounterLong cacheUpdated;
|
||||
|
||||
/**
|
||||
* One cache hit event
|
||||
*/
|
||||
public void incrCacheHit() {
|
||||
cacheHit.incr();
|
||||
}
|
||||
|
||||
/**
|
||||
* One cache cleared
|
||||
*/
|
||||
public void incrCacheCleared() {
|
||||
cacheCleared.incr();
|
||||
}
|
||||
|
||||
/**
|
||||
* One cache updated
|
||||
*/
|
||||
public void incrCacheUpdated() {
|
||||
cacheUpdated.incr();
|
||||
}
|
||||
|
||||
public long getCacheHit() {
|
||||
return cacheHit.value();
|
||||
}
|
||||
|
||||
public long getCacheCleared() {
|
||||
return cacheCleared.value();
|
||||
}
|
||||
|
||||
public long getCacheUpdated() {
|
||||
return cacheUpdated.value();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ipc;
|
||||
|
||||
import org.apache.hadoop.ipc.metrics.RetryCacheMetrics;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
/**
|
||||
* Tests for {@link RetryCacheMetrics}
|
||||
*/
|
||||
public class TestRetryCacheMetrics {
|
||||
static final String cacheName = "NameNodeRetryCache";
|
||||
|
||||
@Test
|
||||
public void testNames() {
|
||||
RetryCache cache = mock(RetryCache.class);
|
||||
when(cache.getCacheName()).thenReturn(cacheName);
|
||||
|
||||
RetryCacheMetrics metrics = RetryCacheMetrics.create(cache);
|
||||
|
||||
metrics.incrCacheHit();
|
||||
|
||||
metrics.incrCacheCleared();
|
||||
metrics.incrCacheCleared();
|
||||
|
||||
metrics.incrCacheUpdated();
|
||||
metrics.incrCacheUpdated();
|
||||
metrics.incrCacheUpdated();
|
||||
|
||||
checkMetrics(1, 2, 3);
|
||||
}
|
||||
|
||||
private void checkMetrics(long hit, long cleared, long updated) {
|
||||
MetricsRecordBuilder rb = getMetrics("RetryCache/" + cacheName);
|
||||
assertCounter("CacheHit", hit, rb);
|
||||
assertCounter("CacheCleared", cleared, rb);
|
||||
assertCounter("CacheUpdated", updated, rb);
|
||||
}
|
||||
}
|
|
@ -130,6 +130,9 @@ Release 2.4.0 - UNRELEASED
|
|||
|
||||
HDFS-5321. Clean up the HTTP-related configuration in HDFS (wheat9)
|
||||
|
||||
HDFS-5167. Add metrics about the NameNode retry cache. (Tsuyoshi OZAWA via
|
||||
jing9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
|
||||
|
|
|
@ -794,7 +794,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
retryCache.addCacheEntry(clientId, callId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
static RetryCache initRetryCache(Configuration conf) {
|
||||
boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY,
|
||||
|
@ -811,7 +811,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
+ " of total heap and retry cache entry expiry time is "
|
||||
+ entryExpiryMillis + " millis");
|
||||
long entryExpiryNanos = entryExpiryMillis * 1000 * 1000;
|
||||
return new RetryCache("Namenode Retry Cache", heapPercent,
|
||||
return new RetryCache("NameNodeRetryCache", heapPercent,
|
||||
entryExpiryNanos);
|
||||
}
|
||||
return null;
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||
import org.apache.hadoop.ipc.metrics.RetryCacheMetrics;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.junit.Before;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
|
||||
|
||||
/**
|
||||
* Tests for ensuring the namenode retry cache metrics works correctly for
|
||||
* non-idempotent requests.
|
||||
*
|
||||
* Retry cache works based on tracking previously received request based on the
|
||||
* ClientId and CallId received in RPC requests and storing the response. The
|
||||
* response is replayed on retry when the same request is received again.
|
||||
*
|
||||
*/
|
||||
public class TestNameNodeRetryCacheMetrics {
|
||||
private MiniDFSCluster cluster;
|
||||
private FSNamesystem namesystem;
|
||||
private DistributedFileSystem filesystem;
|
||||
private int namenodeId = 0;
|
||||
private Configuration conf;
|
||||
private RetryCacheMetrics metrics;
|
||||
|
||||
private DFSClient client;
|
||||
|
||||
/** Start a cluster */
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
conf = new HdfsConfiguration();
|
||||
conf.setBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, true);
|
||||
conf.setInt(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY, 2);
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
cluster.transitionToActive(namenodeId);
|
||||
HATestUtil.setFailoverConfigurations(cluster, conf);
|
||||
filesystem = (DistributedFileSystem) HATestUtil.configureFailoverFs(cluster, conf);
|
||||
namesystem = cluster.getNamesystem(namenodeId);
|
||||
metrics = namesystem.getRetryCache().getMetricsForTests();
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup after the test
|
||||
* @throws IOException
|
||||
**/
|
||||
@After
|
||||
public void cleanup() throws IOException {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetryCacheMetrics() throws IOException {
|
||||
checkMetrics(0, 0, 0);
|
||||
|
||||
// DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY is 2 ,
|
||||
// so 2 requests are dropped at first.
|
||||
// After that, 1 request will reach NameNode correctly.
|
||||
trySaveNamespace();
|
||||
checkMetrics(2, 0, 1);
|
||||
|
||||
// RetryCache will be cleared after Namesystem#close()
|
||||
namesystem.close();
|
||||
checkMetrics(2, 1, 1);
|
||||
}
|
||||
|
||||
private void checkMetrics(long hit, long cleared, long updated) {
|
||||
assertEquals("CacheHit", hit, metrics.getCacheHit());
|
||||
assertEquals("CacheCleared", cleared, metrics.getCacheCleared());
|
||||
assertEquals("CacheUpdated", updated, metrics.getCacheUpdated());
|
||||
}
|
||||
|
||||
private void trySaveNamespace() throws IOException {
|
||||
filesystem.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
|
||||
filesystem.saveNamespace();
|
||||
filesystem.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
|
||||
}
|
||||
|
||||
}
|
|
@ -1181,6 +1181,26 @@ public class TestRetryCacheWithHA {
|
|||
LOG.info("Got the result of " + op.name + ": "
|
||||
+ results.get(op.name));
|
||||
}
|
||||
|
||||
// Waiting for failover.
|
||||
while (cluster.getNamesystem(1).isInStandbyState()) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
|
||||
long hitNN0 = cluster.getNamesystem(0).getRetryCache().getMetricsForTests()
|
||||
.getCacheHit();
|
||||
long hitNN1 = cluster.getNamesystem(1).getRetryCache().getMetricsForTests()
|
||||
.getCacheHit();
|
||||
assertTrue("CacheHit: " + hitNN0 + ", " + hitNN1,
|
||||
hitNN0 + hitNN1 > 0);
|
||||
long updatedNN0 = cluster.getNamesystem(0).getRetryCache()
|
||||
.getMetricsForTests().getCacheUpdated();
|
||||
long updatedNN1 = cluster.getNamesystem(1).getRetryCache()
|
||||
.getMetricsForTests().getCacheUpdated();
|
||||
// Cache updated metrics on NN0 should be >0 since the op was process on NN0
|
||||
assertTrue("CacheUpdated on NN0: " + updatedNN0, updatedNN0 > 0);
|
||||
// Cache updated metrics on NN0 should be >0 since NN1 applied the editlog
|
||||
assertTrue("CacheUpdated on NN1: " + updatedNN1, updatedNN1 > 0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue