From 44b091a8d776db954db1f9ddbba91f7b9445c8ec Mon Sep 17 00:00:00 2001 From: Hanisha Koneru Date: Tue, 3 Jul 2018 09:50:25 -0700 Subject: [PATCH] HDDS-205. Add metrics to HddsDispatcher. Contributed by Bharat Viswanadham. --- .../common/helpers/ContainerMetrics.java | 2 +- .../container/common/impl/Dispatcher.java | 2 +- .../container/common/impl/HddsDispatcher.java | 24 ++++++-- .../container/common/interfaces/Handler.java | 14 +++-- .../container/keyvalue/KeyValueHandler.java | 41 +++++++++++-- .../keyvalue/TestKeyValueHandler.java | 13 +++- .../metrics/TestContainerMetrics.java | 59 +++++++++++-------- 7 files changed, 110 insertions(+), 45 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java index 714db598d7c..2879001c28b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java @@ -94,7 +94,7 @@ public class ContainerMetrics { new ContainerMetrics(intervals)); } - public void incContainerOpcMetrics(ContainerProtos.Type type){ + public void incContainerOpsMetrics(ContainerProtos.Type type) { numOps.incr(); numOpsArray[type.ordinal()].incr(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java index c485caf6d21..3ffe6e4f4e6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java @@ -111,7 +111,7 @@ public class Dispatcher implements ContainerDispatcher { try { Preconditions.checkNotNull(msg); Type cmdType = msg.getCmdType(); - metrics.incContainerOpcMetrics(cmdType); + metrics.incContainerOpsMetrics(cmdType); if ((cmdType == Type.CreateContainer) || (cmdType == Type.DeleteContainer) || (cmdType == Type.ReadContainer) || diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index cbb48ec5da4..25700f9ff5e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.Handler; @@ -53,6 +54,7 @@ public class HddsDispatcher implements ContainerDispatcher { private final ContainerSet containerSet; private final VolumeSet volumeSet; private String scmID; + private ContainerMetrics metrics; /** * Constructs an OzoneContainer that receives calls from @@ -60,16 +62,17 @@ public class HddsDispatcher implements ContainerDispatcher { */ public HddsDispatcher(Configuration config, ContainerSet contSet, VolumeSet volumes) { - //TODO: initialize metrics this.conf = config; this.containerSet = contSet; this.volumeSet = volumes; this.handlers = Maps.newHashMap(); + this.metrics = ContainerMetrics.create(conf); for (ContainerType containerType : ContainerType.values()) { handlers.put(containerType, Handler.getHandlerForContainerType( - containerType, conf, containerSet, volumeSet)); + containerType, conf, containerSet, volumeSet, metrics)); } + } @Override @@ -89,10 +92,14 @@ public class HddsDispatcher implements ContainerDispatcher { Container container = null; ContainerType containerType = null; + ContainerCommandResponseProto responseProto = null; + long startTime = System.nanoTime(); + ContainerProtos.Type cmdType = msg.getCmdType(); try { long containerID = getContainerID(msg); - if (msg.getCmdType() != ContainerProtos.Type.CreateContainer) { + metrics.incContainerOpsMetrics(cmdType); + if (cmdType != ContainerProtos.Type.CreateContainer) { container = getContainer(containerID); containerType = getContainerType(container); } else { @@ -109,7 +116,11 @@ public class HddsDispatcher implements ContainerDispatcher { ContainerProtos.Result.CONTAINER_INTERNAL_ERROR); return ContainerUtils.logAndReturnError(LOG, ex, msg); } - return handler.handle(msg, container); + responseProto = handler.handle(msg, container); + if (responseProto != null) { + metrics.incContainerOpsLatencies(cmdType, System.nanoTime() - startTime); + } + return responseProto; } @Override @@ -187,4 +198,9 @@ public class HddsDispatcher implements ContainerDispatcher { private ContainerType getContainerType(Container container) { return container.getContainerType(); } + + @VisibleForTesting + public void setMetricsForTesting(ContainerMetrics containerMetrics) { + this.metrics = containerMetrics; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index 8069d71788d..57dd2245a24 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.container.common.interfaces; -import com.sun.jersey.spi.resource.Singleton; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; @@ -26,7 +26,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerType; -import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; @@ -42,19 +42,22 @@ public class Handler { protected final ContainerSet containerSet; protected final VolumeSet volumeSet; protected String scmID; + protected final ContainerMetrics metrics; protected Handler(Configuration config, ContainerSet contSet, - VolumeSet volumeSet) { + VolumeSet volumeSet, ContainerMetrics containerMetrics) { conf = config; containerSet = contSet; this.volumeSet = volumeSet; + this.metrics = containerMetrics; } public static Handler getHandlerForContainerType(ContainerType containerType, - Configuration config, ContainerSet contSet, VolumeSet volumeSet) { + Configuration config, ContainerSet contSet, VolumeSet volumeSet, + ContainerMetrics metrics) { switch (containerType) { case KeyValueContainer: - return KeyValueHandler.getInstance(config, contSet, volumeSet); + return KeyValueHandler.getInstance(config, contSet, volumeSet, metrics); default: throw new IllegalArgumentException("Handler for ContainerType: " + containerType + "doesn't exist."); @@ -69,4 +72,5 @@ public class Handler { public void setScmID(String scmId) { this.scmID = scmId; } + } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index ffe0f21cd4e..d174383048a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils; @@ -71,7 +72,20 @@ import java.util.List; import java.util.Map; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.*; + .Result.CONTAINER_INTERNAL_ERROR; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .Result.CLOSED_CONTAINER_IO; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .Result.DELETE_ON_OPEN_CONTAINER; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .Result.IO_EXCEPTION; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .Result.INVALID_CONTAINER_STATE; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .Result.GET_SMALL_FILE_ERROR; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .Result.PUT_SMALL_FILE_ERROR; + import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Stage; @@ -94,16 +108,18 @@ public class KeyValueHandler extends Handler { // TODO : Add metrics and populate it. public static KeyValueHandler getInstance(Configuration config, - ContainerSet contSet, VolumeSet volSet) { + ContainerSet contSet, + VolumeSet volSet, + ContainerMetrics metrics) { if (INSTANCE == null) { - INSTANCE = new KeyValueHandler(config, contSet, volSet); + INSTANCE = new KeyValueHandler(config, contSet, volSet, metrics); } return INSTANCE; } private KeyValueHandler(Configuration config, ContainerSet contSet, - VolumeSet volSet) { - super(config, contSet, volSet); + VolumeSet volSet, ContainerMetrics metrics) { + super(config, contSet, volSet, metrics); containerType = ContainerType.KeyValueContainer; keyManager = new KeyManagerImpl(config); chunkManager = new ChunkManagerImpl(); @@ -342,6 +358,8 @@ public class KeyValueHandler extends Handler { Preconditions.checkNotNull(keyData); keyManager.putKey(kvContainer, keyData); + long numBytes = keyData.getProtoBufMessage().toByteArray().length; + metrics.incContainerBytesStats(Type.PutKey, numBytes); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); } catch (IOException ex) { @@ -370,6 +388,8 @@ public class KeyValueHandler extends Handler { BlockID blockID = BlockID.getFromProtobuf( request.getGetKey().getBlockID()); responseData = keyManager.getKey(kvContainer, blockID); + long numBytes = responseData.getProtoBufMessage().toByteArray().length; + metrics.incContainerBytesStats(Type.GetKey, numBytes); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); @@ -434,6 +454,7 @@ public class KeyValueHandler extends Handler { Preconditions.checkNotNull(chunkInfo); data = chunkManager.readChunk(kvContainer, blockID, chunkInfo); + metrics.incContainerBytesStats(Type.ReadChunk, data.length); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); } catch (IOException ex) { @@ -507,6 +528,13 @@ public class KeyValueHandler extends Handler { chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data, request.getWriteChunk().getStage()); + + // We should increment stats after writeChunk + if (request.getWriteChunk().getStage() == Stage.WRITE_DATA || + request.getWriteChunk().getStage() == Stage.COMBINED) { + metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk() + .getChunkData().getLen()); + } } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); } catch (IOException ex) { @@ -555,6 +583,7 @@ public class KeyValueHandler extends Handler { chunks.add(chunkInfo.getProtoBufMessage()); keyData.setChunks(chunks); keyManager.putKey(kvContainer, keyData); + metrics.incContainerBytesStats(Type.PutSmallFile, data.length); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); @@ -597,7 +626,7 @@ public class KeyValueHandler extends Handler { dataBuf = dataBuf.concat(current); chunkInfo = chunk; } - + metrics.incContainerBytesStats(Type.GetSmallFile, dataBuf.size()); return SmallFileUtils.getGetSmallFileResponseSuccess(request, dataBuf .toByteArray(), ChunkInfo.getFromProtoBuf(chunkInfo)); } catch (StorageContainerException e) { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index dbddf47d554..947ad5141d9 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.Container; @@ -34,11 +35,14 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestRule; import org.junit.rules.Timeout; + +import org.mockito.Mockito; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import org.mockito.Mockito; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.times; + import java.util.UUID; /** @@ -93,10 +97,13 @@ public class TestKeyValueHandler { Mockito.when(dispatcher.getContainer(anyLong())).thenReturn( Mockito.mock(KeyValueContainer.class)); Mockito.when(handler.handle(any(), any())).thenCallRealMethod(); + doCallRealMethod().when(dispatcher).setMetricsForTesting(any()); + dispatcher.setMetricsForTesting(Mockito.mock(ContainerMetrics.class)); // Test Create Container Request handling ContainerCommandRequestProto createContainerRequest = getDummyCommandRequestProto(ContainerProtos.Type.CreateContainer); + dispatcher.dispatch(createContainerRequest); Mockito.verify(handler, times(1)).handleCreateContainer( any(ContainerCommandRequestProto.class), any()); @@ -207,8 +214,8 @@ public class TestKeyValueHandler { any(ContainerCommandRequestProto.class), any()); } - private ContainerCommandRequestProto getDummyCommandRequestProto - (ContainerProtos.Type cmdType) { + private ContainerCommandRequestProto getDummyCommandRequestProto( + ContainerProtos.Type cmdType) { ContainerCommandRequestProto request = ContainerProtos.ContainerCommandRequestProto.newBuilder() .setCmdType(cmdType) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java index ccad6f8d604..ef4b423c980 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java @@ -20,9 +20,10 @@ package org.apache.hadoop.ozone.container.metrics; import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; -import static org.mockito.Mockito.mock; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -34,18 +35,19 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.ContainerTestHelper; -import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; -import org.apache.hadoop.ozone.container.common.impl.Dispatcher; -import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; +import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; import org.apache.hadoop.hdds.scm.TestUtils; -import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.apache.hadoop.hdds.scm.XceiverClient; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Test; -import org.mockito.Mockito; + +import java.io.File; +import java.util.UUID; /** * Test for metrics published by storage containers. @@ -57,7 +59,7 @@ public class TestContainerMetrics { XceiverServer server = null; XceiverClient client = null; long containerID = ContainerTestHelper.getTestContainerID(); - String keyName = OzoneUtils.getRequestID(); + String path = GenericTestUtils.getRandomizedTempPath(); try { final int interval = 1; @@ -70,22 +72,14 @@ public class TestContainerMetrics { conf.setInt(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, interval); - // Since we are only testing for container metrics and we can just - // mock the ContainerManager and ChunkManager instances instead of - // starting the whole cluster. - ContainerManager containerManager = mock(ContainerManager.class); - ChunkManager chunkManager = mock(ChunkManager.class); - Mockito.doNothing().when(chunkManager).writeChunk( - Mockito.any(BlockID.class), - Mockito.any(ChunkInfo.class), Mockito.any(byte[].class), - Mockito.any(ContainerProtos.Stage.class)); - - Mockito.doReturn(chunkManager).when(containerManager).getChunkManager(); - Mockito.doReturn(true).when(containerManager).isOpen(containerID); - - Dispatcher dispatcher = new Dispatcher(containerManager, conf); - dispatcher.init(); DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(); + conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, path); + VolumeSet volumeSet = new VolumeSet(datanodeDetails, conf); + ContainerSet containerSet = new ContainerSet(); + HddsDispatcher dispatcher = new HddsDispatcher(conf, containerSet, + volumeSet); + dispatcher.setScmId(UUID.randomUUID().toString()); + server = new XceiverServer(datanodeDetails, conf, dispatcher); client = new XceiverClient(pipeline, conf); @@ -102,6 +96,8 @@ public class TestContainerMetrics { // Write Chunk BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); + ContainerTestHelper.getWriteChunkRequest( + pipeline, blockID, 1024); ContainerProtos.ContainerCommandRequestProto writeChunkRequest = ContainerTestHelper.getWriteChunkRequest( pipeline, blockID, 1024); @@ -109,13 +105,21 @@ public class TestContainerMetrics { Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); + //Read Chunk + ContainerProtos.ContainerCommandRequestProto readChunkRequest = + ContainerTestHelper.getReadChunkRequest(pipeline, writeChunkRequest + .getWriteChunk()); + response = client.sendCommand(readChunkRequest); + Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); + MetricsRecordBuilder containerMetrics = getMetrics( "StorageContainerMetrics"); - assertCounter("NumOps", 2L, containerMetrics); + assertCounter("NumOps", 3L, containerMetrics); assertCounter("numCreateContainer", 1L, containerMetrics); assertCounter("numWriteChunk", 1L, containerMetrics); + assertCounter("numReadChunk", 1L, containerMetrics); assertCounter("bytesWriteChunk", 1024L, containerMetrics); - assertCounter("LatencyWriteChunkNumOps", 1L, containerMetrics); + assertCounter("bytesReadChunk", 1024L, containerMetrics); String sec = interval + "s"; Thread.sleep((interval + 1) * 1000); @@ -127,6 +131,11 @@ public class TestContainerMetrics { if (server != null) { server.stop(); } + // clean up volume dir + File file = new File(path); + if(file.exists()) { + FileUtil.fullyDelete(file); + } } } } \ No newline at end of file