HDDS-918. Expose SCMMXBean as a MetricsSource. Contributed by Siddharth Wagle.
This commit is contained in:
parent
771ea6b5e7
commit
43e421afef
|
@ -56,6 +56,14 @@ public interface ContainerManager extends Closeable {
|
|||
*/
|
||||
List<ContainerInfo> getContainers(HddsProtos.LifeCycleState state);
|
||||
|
||||
/**
|
||||
* Returns number of containers in the given,
|
||||
* {@link org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState}.
|
||||
*
|
||||
* @return Number of containers
|
||||
*/
|
||||
Integer getContainerCountByState(HddsProtos.LifeCycleState state);
|
||||
|
||||
/**
|
||||
* Returns the ContainerInfo from the container ID.
|
||||
*
|
||||
|
|
|
@ -17,29 +17,8 @@
|
|||
|
||||
package org.apache.hadoop.hdds.scm.container;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.StorageUnit;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.container.states.ContainerState;
|
||||
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
||||
import org.apache.hadoop.ozone.common.statemachine
|
||||
.InvalidStateTransitionException;
|
||||
import org.apache.hadoop.ozone.common.statemachine.StateMachine;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
|
||||
.FAILED_TO_CHANGE_CONTAINER_STATE;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
|
@ -50,8 +29,29 @@ import java.util.Set;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
|
||||
.FAILED_TO_CHANGE_CONTAINER_STATE;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.StorageUnit;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.hdds.scm.container.states.ContainerState;
|
||||
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
|
||||
import org.apache.hadoop.ozone.common.statemachine
|
||||
.InvalidStateTransitionException;
|
||||
import org.apache.hadoop.ozone.common.statemachine.StateMachine;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.util.concurrent.AtomicLongMap;
|
||||
|
||||
/**
|
||||
* A container state manager keeps track of container states and returns
|
||||
|
@ -121,6 +121,8 @@ public class ContainerStateManager {
|
|||
private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap;
|
||||
private final ContainerStateMap containers;
|
||||
private final AtomicLong containerCount;
|
||||
private final AtomicLongMap<LifeCycleState> containerStateCount =
|
||||
AtomicLongMap.create();
|
||||
|
||||
/**
|
||||
* Constructs a Container State Manager that tracks all containers owned by
|
||||
|
@ -224,11 +226,12 @@ public class ContainerStateManager {
|
|||
LifeCycleEvent.CLEANUP);
|
||||
}
|
||||
|
||||
void loadContainer(final ContainerInfo containerInfo)
|
||||
throws SCMException {
|
||||
|
||||
void loadContainer(final ContainerInfo containerInfo) throws SCMException {
|
||||
containers.addContainer(containerInfo);
|
||||
containerCount.set(Long.max(
|
||||
containerInfo.getContainerID(), containerCount.get()));
|
||||
containerStateCount.incrementAndGet(containerInfo.getState());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -297,6 +300,7 @@ public class ContainerStateManager {
|
|||
ContainerID.valueof(containerID));
|
||||
Preconditions.checkNotNull(containerInfo);
|
||||
containers.addContainer(containerInfo);
|
||||
containerStateCount.incrementAndGet(containerInfo.getState());
|
||||
LOG.trace("New container allocated: {}", containerInfo);
|
||||
return containerInfo;
|
||||
}
|
||||
|
@ -317,6 +321,8 @@ public class ContainerStateManager {
|
|||
final LifeCycleState newState = stateMachine.getNextState(
|
||||
info.getState(), event);
|
||||
containers.updateState(containerID, info.getState(), newState);
|
||||
containerStateCount.incrementAndGet(newState);
|
||||
containerStateCount.decrementAndGet(info.getState());
|
||||
return containers.getContainerInfo(containerID);
|
||||
} catch (InvalidStateTransitionException ex) {
|
||||
String error = String.format("Failed to update container state %s, " +
|
||||
|
@ -440,6 +446,16 @@ public class ContainerStateManager {
|
|||
return containers.getContainerIDsByState(state);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get count of containers in the current {@link LifeCycleState}.
|
||||
*
|
||||
* @param state {@link LifeCycleState}
|
||||
* @return Count of containers
|
||||
*/
|
||||
Integer getContainerCountByState(final LifeCycleState state) {
|
||||
return Long.valueOf(containerStateCount.get(state)).intValue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a set of ContainerIDs that match the Container.
|
||||
*
|
||||
|
@ -467,8 +483,6 @@ public class ContainerStateManager {
|
|||
return containers.getContainerInfo(containerID);
|
||||
}
|
||||
|
||||
|
||||
|
||||
void close() throws IOException {
|
||||
}
|
||||
|
||||
|
|
|
@ -176,6 +176,16 @@ public class SCMContainerManager implements ContainerManager {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get number of containers in the given state.
|
||||
*
|
||||
* @param state {@link LifeCycleState}
|
||||
* @return Count
|
||||
*/
|
||||
public Integer getContainerCountByState(LifeCycleState state) {
|
||||
return containerStateManager.getContainerCountByState(state);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,87 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.scm.server;
|
||||
|
||||
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED;
|
||||
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSING;
|
||||
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETED;
|
||||
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETING;
|
||||
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
|
||||
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.QUASI_CLOSED;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||
import org.apache.hadoop.metrics2.MetricsSource;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.Interns;
|
||||
|
||||
/**
|
||||
* Metrics source to report number of containers in different states.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@Metrics(about = "SCM Container Manager Metrics", context = "ozone")
|
||||
public class SCMContainerMetrics implements MetricsSource {
|
||||
|
||||
private final SCMMXBean scmmxBean;
|
||||
private static final String SOURCE = SCMContainerMetrics.class.getName();
|
||||
|
||||
public SCMContainerMetrics(SCMMXBean scmmxBean) {
|
||||
this.scmmxBean = scmmxBean;
|
||||
}
|
||||
|
||||
public static SCMContainerMetrics create(SCMMXBean scmmxBean) {
|
||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
return ms.register(SOURCE, "Storage " +
|
||||
"Container Manager Metrics", new SCMContainerMetrics(scmmxBean));
|
||||
}
|
||||
|
||||
public void unRegister() {
|
||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
ms.unregisterSource(SOURCE);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("SuspiciousMethodCalls")
|
||||
public void getMetrics(MetricsCollector collector, boolean all) {
|
||||
Map<String, Integer> stateCount = scmmxBean.getContainerStateCount();
|
||||
|
||||
collector.addRecord(SOURCE)
|
||||
.addGauge(Interns.info("OpenContainers",
|
||||
"Number of open containers"),
|
||||
stateCount.get(OPEN.toString()))
|
||||
.addGauge(Interns.info("ClosingContainers",
|
||||
"Number of containers in closing state"),
|
||||
stateCount.get(CLOSING.toString()))
|
||||
.addGauge(Interns.info("QuasiClosedContainers",
|
||||
"Number of containers in quasi closed state"),
|
||||
stateCount.get(QUASI_CLOSED.toString()))
|
||||
.addGauge(Interns.info("ClosedContainers",
|
||||
"Number of containers in closed state"),
|
||||
stateCount.get(CLOSED.toString()))
|
||||
.addGauge(Interns.info("DeletingContainers",
|
||||
"Number of containers in deleting state"),
|
||||
stateCount.get(DELETING.toString()))
|
||||
.addGauge(Interns.info("DeletedContainers",
|
||||
"Number of containers in deleted state"),
|
||||
stateCount.get(DELETED.toString()));
|
||||
}
|
||||
}
|
|
@ -162,7 +162,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
private final SCMDatanodeProtocolServer datanodeProtocolServer;
|
||||
private final SCMBlockProtocolServer blockProtocolServer;
|
||||
private final SCMClientProtocolServer clientProtocolServer;
|
||||
private SCMSecurityProtocolServer securityProtocolServer;
|
||||
private SCMSecurityProtocolServer securityProtocolServer;
|
||||
|
||||
/*
|
||||
* State Managers of SCM.
|
||||
|
@ -206,6 +206,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
private JvmPauseMonitor jvmPauseMonitor;
|
||||
private final OzoneConfiguration configuration;
|
||||
private final ChillModeHandler chillModeHandler;
|
||||
private SCMContainerMetrics scmContainerMetrics;
|
||||
|
||||
/**
|
||||
* Creates a new StorageContainerManager. Configuration will be
|
||||
|
@ -239,7 +240,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
Objects.requireNonNull(conf, "configuration cannot not be null");
|
||||
|
||||
configuration = conf;
|
||||
StorageContainerManager.initMetrics();
|
||||
initMetrics();
|
||||
initContainerReportCache(conf);
|
||||
/**
|
||||
* It is assumed the scm --init command creates the SCM Storage Config.
|
||||
|
@ -366,6 +367,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
|
||||
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, chillModeHandler);
|
||||
registerMXBean();
|
||||
registerMetricsSource(this);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -841,6 +843,10 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
jmxProperties, this);
|
||||
}
|
||||
|
||||
private void registerMetricsSource(SCMMXBean scmMBean) {
|
||||
scmContainerMetrics = SCMContainerMetrics.create(scmMBean);
|
||||
}
|
||||
|
||||
private void unregisterMXBean() {
|
||||
if (this.scmInfoBeanName != null) {
|
||||
MBeans.unregister(this.scmInfoBeanName);
|
||||
|
@ -999,6 +1005,10 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
}
|
||||
|
||||
unregisterMXBean();
|
||||
if (scmContainerMetrics != null) {
|
||||
scmContainerMetrics.unRegister();
|
||||
}
|
||||
|
||||
// Event queue must be stopped before the DB store is closed at the end.
|
||||
try {
|
||||
LOG.info("Stopping SCM Event Queue.");
|
||||
|
@ -1195,8 +1205,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
public Map<String, Integer> getContainerStateCount() {
|
||||
Map<String, Integer> nodeStateCount = new HashMap<>();
|
||||
for (HddsProtos.LifeCycleState state : HddsProtos.LifeCycleState.values()) {
|
||||
nodeStateCount.put(state.toString(), containerManager.getContainers(
|
||||
state).size());
|
||||
nodeStateCount.put(state.toString(),
|
||||
containerManager.getContainerCountByState(state));
|
||||
}
|
||||
return nodeStateCount;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.scm.server;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||
import org.apache.hadoop.metrics2.MetricsInfo;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.metrics2.lib.Interns;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test metrics that represent container states.
|
||||
*/
|
||||
public class TestSCMContainerMetrics {
|
||||
@Test
|
||||
public void testSCMContainerMetrics() {
|
||||
SCMMXBean scmmxBean = mock(SCMMXBean.class);
|
||||
|
||||
Map<String, Integer> stateInfo = new HashMap<String, Integer>() {{
|
||||
put(HddsProtos.LifeCycleState.OPEN.toString(), 2);
|
||||
put(HddsProtos.LifeCycleState.CLOSING.toString(), 3);
|
||||
put(HddsProtos.LifeCycleState.QUASI_CLOSED.toString(), 4);
|
||||
put(HddsProtos.LifeCycleState.CLOSED.toString(), 5);
|
||||
put(HddsProtos.LifeCycleState.DELETING.toString(), 6);
|
||||
put(HddsProtos.LifeCycleState.DELETED.toString(), 7);
|
||||
}};
|
||||
|
||||
|
||||
when(scmmxBean.getContainerStateCount()).thenReturn(stateInfo);
|
||||
|
||||
MetricsRecordBuilder mb = mock(MetricsRecordBuilder.class);
|
||||
when(mb.addGauge(any(MetricsInfo.class), anyInt())).thenReturn(mb);
|
||||
|
||||
MetricsCollector metricsCollector = mock(MetricsCollector.class);
|
||||
when(metricsCollector.addRecord(anyString())).thenReturn(mb);
|
||||
|
||||
SCMContainerMetrics containerMetrics = new SCMContainerMetrics(scmmxBean);
|
||||
|
||||
containerMetrics.getMetrics(metricsCollector, true);
|
||||
|
||||
verify(mb, times(1)).addGauge(Interns.info("OpenContainers",
|
||||
"Number of open containers"), 2);
|
||||
verify(mb, times(1)).addGauge(Interns.info("ClosingContainers",
|
||||
"Number of containers in closing state"), 3);
|
||||
verify(mb, times(1)).addGauge(Interns.info("QuasiClosedContainers",
|
||||
"Number of containers in quasi closed state"), 4);
|
||||
verify(mb, times(1)).addGauge(Interns.info("ClosedContainers",
|
||||
"Number of containers in closed state"), 5);
|
||||
verify(mb, times(1)).addGauge(Interns.info("DeletingContainers",
|
||||
"Number of containers in deleting state"), 6);
|
||||
verify(mb, times(1)).addGauge(Interns.info("DeletedContainers",
|
||||
"Number of containers in deleted state"), 7);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue