Added new SysMonitorOshi v0 using Oshi library (#14359)

Added a new monitor SysMonitorOshi to replace SysMonitor. The new monitor has a wider support for different machine architectures including ARM instances. Please switch to SysMonitorOshi as SysMonitor is now deprecated and will be removed in future releases.
This commit is contained in:
Hardik Bajaj 2023-06-20 20:57:58 +05:30 committed by GitHub
parent f5cc823d0f
commit 1ea9158a50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1254 additions and 4 deletions

View File

@ -279,6 +279,9 @@ SOURCE/JAVA-CORE
This product contains lpad and rpad methods adapted from Apache Flink.
* processing/src/main/java/org/apache/druid/java/util/common/StringUtils.java
This product contains SystemInfo methods adapted from oshi
* processing/src/main/java/org/apache/druid/java/util/metrics/OshiSysMonitor.java
MIT License
================================

View File

@ -292,6 +292,7 @@ def build_compatible_license_names():
compatible_licenses['MIT License'] = 'MIT License'
compatible_licenses['The MIT License (MIT)'] = 'MIT License'
compatible_licenses['Bouncy Castle Licence'] = 'MIT License'
compatible_licenses['SPDX-License-Identifier: MIT'] = 'MIT License'
compatible_licenses['The Go license'] = 'The Go license'
@ -435,7 +436,6 @@ if __name__ == "__main__":
license_yaml = args.license_yaml
dependency_reports_root = args.dependency_reports_root
check_licenses(license_yaml, dependency_reports_root)
except KeyboardInterrupt:

View File

@ -1591,7 +1591,7 @@ name: Java Native Access (JNA)
license_category: binary
module: java-core
license_name: Apache License version 2.0
version: 4.5.1
version: 5.13.0
libraries:
- net.java.dev.jna: jna
@ -2341,6 +2341,17 @@ notices:
---
name: OSHI
license_category: binary
module: java-core
license_name: MIT License
version: 6.4.2
libraries:
- com.github.oshi: oshi-core
license_file_path: licenses/bin/oshi.MIT
---
name: JBoss Logging 3
license_category: binary
module: java-core
@ -4914,7 +4925,7 @@ libraries:
name: net.java.dev.jna jna-platform
license_category: binary
version: 5.2.0
version: 5.13.0
module: druid-ranger-security
license_name: Apache License version 2.0
libraries:

21
licenses/bin/oshi.MIT Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2010-2023 The OSHI Project Contributors: https://github.com/oshi/oshi/graphs/contributors
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -109,6 +109,8 @@
<protobuf.version>3.21.7</protobuf.version>
<resilience4j.version>1.3.1</resilience4j.version>
<slf4j.version>1.7.36</slf4j.version>
<jna.version>5.13.0</jna.version>
<jna-platform.version>5.13.0</jna-platform.version>
<hadoop.compile.version>3.3.5</hadoop.compile.version>
<mockito.version>4.3.1</mockito.version>
<aws.sdk.version>1.12.317</aws.sdk.version>
@ -882,7 +884,12 @@
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>4.5.1</version>
<version>${jna.version}</version>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna-platform</artifactId>
<version>${jna-platform.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>

View File

@ -37,6 +37,7 @@
<sigar.base.version>1.6.5</sigar.base.version>
<sigar.version>${sigar.base.version}.132</sigar.version>
<ipaddress.version>5.3.4</ipaddress.version>
<oshi.version>6.4.2</oshi.version>
</properties>
<dependencies>
@ -335,6 +336,11 @@
dependency are copied as resources. See maven-dependency-plugin configuration and <resources> below. -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.oshi</groupId>
<artifactId>oshi-core</artifactId>
<version>${oshi.version}</version>
</dependency>
<!-- Tests -->

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.metrics;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
public class NoopOshiSysMonitor extends OshiSysMonitor
{
public NoopOshiSysMonitor()
{
super();
}
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
return false;
}
}

View File

@ -0,0 +1,466 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.metrics;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import oshi.SystemInfo;
import oshi.hardware.CentralProcessor;
import oshi.hardware.CentralProcessor.TickType;
import oshi.hardware.GlobalMemory;
import oshi.hardware.HWDiskStore;
import oshi.hardware.HardwareAbstractionLayer;
import oshi.hardware.NetworkIF;
import oshi.hardware.VirtualMemory;
import oshi.software.os.FileSystem;
import oshi.software.os.InternetProtocolStats;
import oshi.software.os.OSFileStore;
import oshi.software.os.OperatingSystem;
import java.util.List;
import java.util.Map;
/**
* SysMonitor implemented using {@link oshi}
* <p>
* Following stats are emitted:
* <ul>
* <li>{@link MemStats} for Memory related metrics</li>
* <li>{@link SwapStats} for swap storage related metrics</li>
* <li>{@link FsStats} for File System related Metrics</li>
* <li>{@link DiskStats} for Disk level metrics</li>
* <li>{@link NetStats} for Network Interface and related metrics</li>
* <li>{@link CpuStats} for CPU usage and stats metrics</li>
* <li>{@link SysStats} for overall system metrics(uptime, avg load)</li>
* <li>{@link TcpStats} for TCP related metrics</li>
* </ul>
*/
public class OshiSysMonitor extends FeedDefiningMonitor
{
private final SystemInfo si;
private final HardwareAbstractionLayer hal;
private final OperatingSystem os;
private static final List<String> NET_ADDRESS_BLACKLIST = ImmutableList.of("0.0.0.0", "127.0.0.1");
private final MemStats memStats;
private final SwapStats swapStats;
private final FsStats fsStats;
private final DiskStats diskStats;
private final NetStats netStats;
private final CpuStats cpuStats;
private final SysStats sysStats;
private final TcpStats tcpStats;
private final Map<String, String[]> dimensions;
public OshiSysMonitor()
{
this(ImmutableMap.of());
}
public OshiSysMonitor(Map<String, String[]> dimensions)
{
this(dimensions, DEFAULT_METRICS_FEED);
}
public OshiSysMonitor(Map<String, String[]> dimensions, String feed)
{
super(feed);
Preconditions.checkNotNull(dimensions);
this.dimensions = ImmutableMap.copyOf(dimensions);
this.si = new SystemInfo();
this.hal = si.getHardware();
this.os = si.getOperatingSystem();
this.memStats = new MemStats();
this.swapStats = new SwapStats();
this.fsStats = new FsStats();
this.diskStats = new DiskStats();
this.netStats = new NetStats();
this.cpuStats = new CpuStats();
this.sysStats = new SysStats();
this.tcpStats = new TcpStats();
}
// Create an object with mocked systemInfo for testing purposes
public OshiSysMonitor(SystemInfo systemInfo)
{
super("metrics");
this.dimensions = ImmutableMap.of();
this.si = systemInfo;
this.hal = si.getHardware();
this.os = si.getOperatingSystem();
this.memStats = new MemStats();
this.swapStats = new SwapStats();
this.fsStats = new FsStats();
this.diskStats = new DiskStats();
this.netStats = new NetStats();
this.cpuStats = new CpuStats();
this.sysStats = new SysStats();
this.tcpStats = new TcpStats();
}
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
monitorMemStats(emitter);
monitorSwapStats(emitter);
monitorFsStats(emitter);
monitorDiskStats(emitter);
monitorNetStats(emitter);
monitorCpuStats(emitter);
monitorSysStats(emitter);
monitorTcpStats(emitter);
return true;
}
// Emit stats for a particular stat(mem, swap, filestore, etc) from statsList for testing
public void monitorMemStats(ServiceEmitter emitter)
{
memStats.emit(emitter);
}
public void monitorSwapStats(ServiceEmitter emitter)
{
swapStats.emit(emitter);
}
public void monitorFsStats(ServiceEmitter emitter)
{
fsStats.emit(emitter);
}
public void monitorDiskStats(ServiceEmitter emitter)
{
diskStats.emit(emitter);
}
public void monitorNetStats(ServiceEmitter emitter)
{
netStats.emit(emitter);
}
public void monitorCpuStats(ServiceEmitter emitter)
{
cpuStats.emit(emitter);
}
public void monitorSysStats(ServiceEmitter emitter)
{
sysStats.emit(emitter);
}
public void monitorTcpStats(ServiceEmitter emitter)
{
tcpStats.emit(emitter);
}
/**
* Implementation of Memstats
* <p>
* Define a method {@link #emit(ServiceEmitter)} to emit metrices in emiters
*/
private class MemStats
{
public void emit(ServiceEmitter emitter)
{
GlobalMemory mem = hal.getMemory();
if (mem != null) {
final Map<String, Long> stats = ImmutableMap.of(
"sys/mem/max",
mem.getTotal(),
"sys/mem/used",
mem.getTotal() - mem.getAvailable(),
// This is total actual memory used, not including cache and buffer memory
"sys/mem/free",
mem.getAvailable()
);
final ServiceMetricEvent.Builder builder = builder();
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
for (Map.Entry<String, Long> entry : stats.entrySet()) {
emitter.emit(builder.build(entry.getKey(), entry.getValue()));
}
}
}
}
private class SwapStats
{
private long prevPageIn = 0;
private long prevPageOut = 0;
public void emit(ServiceEmitter emitter)
{
VirtualMemory swap = hal.getMemory().getVirtualMemory();
if (swap != null) {
long currPageIn = swap.getSwapPagesIn();
long currPageOut = swap.getSwapPagesOut();
final Map<String, Long> stats = ImmutableMap.of(
"sys/swap/pageIn", currPageIn - prevPageIn,
"sys/swap/pageOut", currPageOut - prevPageOut,
"sys/swap/max", swap.getSwapTotal(),
"sys/swap/free", swap.getSwapTotal() - swap.getSwapUsed()
);
final ServiceMetricEvent.Builder builder = builder();
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
for (Map.Entry<String, Long> entry : stats.entrySet()) {
emitter.emit(builder.build(entry.getKey(), entry.getValue()));
}
this.prevPageIn = currPageIn;
this.prevPageOut = currPageOut;
}
}
}
private class FsStats
{
public void emit(ServiceEmitter emitter)
{
FileSystem fileSystem = os.getFileSystem();
for (OSFileStore fs : fileSystem.getFileStores(true)) { // get only local file store : true
final Map<String, Long> stats = ImmutableMap.<String, Long>builder()
.put("sys/fs/max", fs.getTotalSpace())
.put("sys/fs/used", fs.getTotalSpace() - fs.getUsableSpace())
.put("sys/fs/files/count", fs.getTotalInodes())
.put("sys/fs/files/free", fs.getFreeInodes())
.build();
final ServiceMetricEvent.Builder builder = builder()
.setDimension("fsDevName", fs.getVolume())
.setDimension("fsDirName", fs.getMount());
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
for (Map.Entry<String, Long> entry : stats.entrySet()) {
emitter.emit(builder.build(entry.getKey(), entry.getValue()));
}
}
}
}
private class DiskStats
{
// Difference b/w metrics of two consecutive values. It tells Δmetric (increase/decrease in metrics value)
private final KeyedDiff diff = new KeyedDiff();
public void emit(ServiceEmitter emitter)
{
List<HWDiskStore> disks = hal.getDiskStores();
// disk partitions can be mapped to file system but no inbuilt method is there to find relation b/w disks and file system
// Will have to add logic for that
for (HWDiskStore disk : disks) {
final Map<String, Long> stats = diff.to(
disk.getName(),
ImmutableMap.<String, Long>builder()
.put("sys/disk/read/size", disk.getReadBytes())
.put("sys/disk/read/count", disk.getReads())
.put("sys/disk/write/size", disk.getWriteBytes())
.put("sys/disk/write/count", disk.getWrites())
.put("sys/disk/queue", disk.getCurrentQueueLength())
.put("sys/disk/transferTime", disk.getTransferTime())
.build()
);
if (stats != null) {
final ServiceMetricEvent.Builder builder = builder()
.setDimension("diskName", disk.getName());
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
for (Map.Entry<String, Long> entry : stats.entrySet()) {
emitter.emit(builder.build(entry.getKey(), entry.getValue()));
}
}
}
}
}
private class NetStats
{
private final KeyedDiff diff = new KeyedDiff();
public void emit(ServiceEmitter emitter)
{
List<NetworkIF> networkIFS = hal.getNetworkIFs();
for (NetworkIF net : networkIFS) {
final String name = net.getName();
for (String addr : net.getIPv4addr()) {
if (!NET_ADDRESS_BLACKLIST.contains(addr)) {
// Only emit metrics for non black-listed ip addresses
String mapKey = name
+ "_"
+ addr; // Network_Name_IPV4 address as key, ex: wifi_192.1.0.1 to uniquely identify the dimension
final Map<String, Long> stats = diff.to(
mapKey,
ImmutableMap.<String, Long>builder()
.put("sys/net/read/size", net.getBytesRecv())
.put("sys/net/read/packets", net.getPacketsRecv())
.put("sys/net/read/errors", net.getInErrors())
.put("sys/net/read/dropped", net.getInDrops())
.put("sys/net/write/size", net.getBytesSent())
.put("sys/net/write/packets", net.getPacketsSent())
.put("sys/net/write/errors", net.getOutErrors())
.put("sys/net/write/collisions", net.getCollisions())
.build()
);
if (stats != null) {
final ServiceMetricEvent.Builder builder = builder()
.setDimension("netName", net.getName())
.setDimension("netAddress", addr)
.setDimension("netHwaddr", net.getMacaddr());
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
for (Map.Entry<String, Long> entry : stats.entrySet()) {
emitter.emit(builder.build(entry.getKey(), entry.getValue()));
}
}
}
}
}
}
}
private class CpuStats
{
private final KeyedDiff diff = new KeyedDiff();
public void emit(ServiceEmitter emitter)
{
CentralProcessor processor = hal.getProcessor();
long[][] procTicks = processor.getProcessorCpuLoadTicks();
for (int i = 0; i < procTicks.length; ++i) {
final String name = Integer.toString(i);
long[] ticks = procTicks[i];
long user = ticks[TickType.USER.getIndex()];
long nice = ticks[TickType.NICE.getIndex()];
long sys = ticks[TickType.SYSTEM.getIndex()];
long idle = ticks[TickType.IDLE.getIndex()];
long iowait = ticks[TickType.IOWAIT.getIndex()];
long irq = ticks[TickType.IRQ.getIndex()];
long softirq = ticks[TickType.SOFTIRQ.getIndex()];
long steal = ticks[TickType.STEAL.getIndex()];
long totalCpu = user + nice + sys + idle + iowait + irq + softirq + steal;
final Map<String, Long> stats = diff.to(
name,
ImmutableMap.<String, Long>builder()
.put("user", user) // user = Δuser / Δtotal
.put("sys", sys) // sys = Δsys / Δtotal
.put("nice", nice) // nice = Δnice / Δtotal
.put("wait", iowait) // wait = Δwait / Δtotal
.put("irq", irq) // irq = Δirq / Δtotal
.put("softIrq", softirq) // softIrq = ΔsoftIrq / Δtotal
.put("stolen", steal) // stolen = Δstolen / Δtotal
.put("idle", idle) // idle = Δidle / Δtotal
.put("_total", totalCpu) // (not reported)
.build()
);
if (stats != null) {
final long total = stats.remove("_total");
for (Map.Entry<String, Long> entry : stats.entrySet()) {
final ServiceMetricEvent.Builder builder = builder()
.setDimension("cpuName", name)
.setDimension("cpuTime", entry.getKey());
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
if (total != 0) {
// prevent divide by 0 exception and don't emit such events
emitter.emit(builder.build("sys/cpu", entry.getValue() * 100 / total)); // [0,100]
}
}
}
}
}
}
private class SysStats
{
public void emit(ServiceEmitter emitter)
{
final ServiceMetricEvent.Builder builder = builder();
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
long uptime = os.getSystemUptime();
final Map<String, Number> stats = ImmutableMap.of(
"sys/uptime", uptime
);
for (Map.Entry<String, Number> entry : stats.entrySet()) {
emitter.emit(builder.build(entry.getKey(), entry.getValue()));
}
CentralProcessor processor = hal.getProcessor();
double[] la = processor.getSystemLoadAverage(3);
if (la != null) {
final Map<String, Number> statsCpuLoadAverage = ImmutableMap.of(
"sys/la/1", la[0],
"sys/la/5", la[1],
"sys/la/15", la[2]
);
for (Map.Entry<String, Number> entry : statsCpuLoadAverage.entrySet()) {
emitter.emit(builder.build(entry.getKey(), entry.getValue()));
}
}
}
}
private class TcpStats
{
private final KeyedDiff diff = new KeyedDiff();
public void emit(ServiceEmitter emitter)
{
final ServiceMetricEvent.Builder builder = builder();
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
InternetProtocolStats ipstats = os.getInternetProtocolStats();
InternetProtocolStats.TcpStats tcpv4 = ipstats.getTCPv4Stats();
if (tcpv4 != null) {
final Map<String, Long> stats = diff.to(
"tcpv4", ImmutableMap.<String, Long>builder()
.put("sys/tcpv4/activeOpens", tcpv4.getConnectionsActive())
.put("sys/tcpv4/passiveOpens", tcpv4.getConnectionsPassive())
.put("sys/tcpv4/attemptFails", tcpv4.getConnectionFailures())
.put("sys/tcpv4/estabResets", tcpv4.getConnectionsReset())
.put("sys/tcpv4/in/segs", tcpv4.getSegmentsReceived())
.put("sys/tcpv4/in/errs", tcpv4.getInErrors())
.put("sys/tcpv4/out/segs", tcpv4.getSegmentsSent())
.put("sys/tcpv4/out/rsts", tcpv4.getOutResets())
.put("sys/tcpv4/retrans/segs", tcpv4.getSegmentsRetransmitted())
.build()
);
if (stats != null) {
for (Map.Entry<String, Long> entry : stats.entrySet()) {
emitter.emit(builder.build(entry.getKey(), entry.getValue()));
}
}
}
}
}
}

View File

@ -46,6 +46,14 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* Deprecated, SysMonitor will now be maintained in {@link OshiSysMonitor}
*
* Sys monitor was implemented using @link org.hyperic.sigar which is no longer maintained.
* {@link oshi} based SysMonitor will be maintained and used from now on, and is implemented in org.apache.druid.java.util.metrics.OshiSysMonitor
*/
@Deprecated
public class SysMonitor extends FeedDefiningMonitor
{
private static final Logger log = new Logger(SysMonitor.class);

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.metrics;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
public class NoopOshiSysMonitorTest
{
@Test
public void testDoMonitor()
{
ServiceEmitter serviceEmitter = Mockito.mock(ServiceEmitter.class);
NoopOshiSysMonitor noopOshiSysMonitor = new NoopOshiSysMonitor();
Assert.assertFalse(noopOshiSysMonitor.doMonitor(serviceEmitter));
}
}

View File

@ -0,0 +1,611 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.metrics;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import oshi.SystemInfo;
import oshi.hardware.CentralProcessor;
import oshi.hardware.GlobalMemory;
import oshi.hardware.HWDiskStore;
import oshi.hardware.HardwareAbstractionLayer;
import oshi.hardware.NetworkIF;
import oshi.hardware.VirtualMemory;
import oshi.software.os.FileSystem;
import oshi.software.os.InternetProtocolStats;
import oshi.software.os.OSFileStore;
import oshi.software.os.OperatingSystem;
import oshi.util.Util;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class OshiSysMonitorTest
{
private SystemInfo si;
private HardwareAbstractionLayer hal;
private OperatingSystem os;
private enum STATS
{
MEM, SWAP, FS, DISK, NET, CPU, SYS, TCP
}
@Before
public void setUp()
{
si = Mockito.mock(SystemInfo.class);
hal = Mockito.mock(HardwareAbstractionLayer.class);
os = Mockito.mock(OperatingSystem.class);
Mockito.when(si.getHardware()).thenReturn(hal);
Mockito.when(si.getOperatingSystem()).thenReturn(os);
}
@Test
public void testDoMonitor()
{
ServiceEmitter serviceEmitter = Mockito.mock(ServiceEmitter.class);
OshiSysMonitor sysMonitorOshi = new OshiSysMonitor();
serviceEmitter.start();
sysMonitorOshi.monitor(serviceEmitter);
Assert.assertTrue(sysMonitorOshi.doMonitor(serviceEmitter));
}
@Test
public void testDefaultFeedSysMonitorOshi()
{
StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000");
OshiSysMonitor m = new OshiSysMonitor();
m.start();
m.monitor(emitter);
// Sleep for 2 sec to get all metrics which are difference of prev and now metrics
Util.sleep(2000);
m.monitor(emitter);
m.stop();
checkEvents(emitter.getEvents(), "metrics");
}
@Test
public void testMemStats()
{
StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000");
GlobalMemory mem = Mockito.mock(GlobalMemory.class);
Mockito.when(mem.getTotal()).thenReturn(64L);
Mockito.when(mem.getAvailable()).thenReturn(16L);
Mockito.when(hal.getMemory()).thenReturn(mem);
OshiSysMonitor m = new OshiSysMonitor(si);
m.start();
m.monitorMemStats(emitter);
m.stop();
Assert.assertEquals(3, emitter.getEvents().size());
emitter.verifyEmitted("sys/mem/max", 1);
emitter.verifyEmitted("sys/mem/used", 1);
emitter.verifyEmitted("sys/mem/free", 1);
emitter.verifyValue("sys/mem/max", 64L);
emitter.verifyValue("sys/mem/used", 48L);
emitter.verifyValue("sys/mem/free", 16L);
}
@Test
public void testSwapStats()
{
StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000");
GlobalMemory mem = Mockito.mock(GlobalMemory.class);
VirtualMemory swap = Mockito.mock(VirtualMemory.class);
Mockito.when(swap.getSwapPagesIn()).thenReturn(300L);
Mockito.when(swap.getSwapPagesOut()).thenReturn(200L);
Mockito.when(swap.getSwapTotal()).thenReturn(1000L);
Mockito.when(swap.getSwapUsed()).thenReturn(700L);
Mockito.when(mem.getVirtualMemory()).thenReturn(swap);
Mockito.when(hal.getMemory()).thenReturn(mem);
OshiSysMonitor m = new OshiSysMonitor(si);
m.start();
m.monitorSwapStats(emitter);
Assert.assertEquals(4, emitter.getEvents().size());
emitter.verifyEmitted("sys/swap/pageIn", 1);
emitter.verifyEmitted("sys/swap/pageOut", 1);
emitter.verifyEmitted("sys/swap/max", 1);
emitter.verifyEmitted("sys/swap/free", 1);
emitter.verifyValue("sys/swap/pageIn", 300L);
emitter.verifyValue("sys/swap/pageOut", 200L);
emitter.verifyValue("sys/swap/max", 1000L);
emitter.verifyValue("sys/swap/free", 300L);
// Emit again to assert diff in pageIn stats
Mockito.when(swap.getSwapPagesIn()).thenReturn(400L);
Mockito.when(swap.getSwapPagesOut()).thenReturn(250L);
Mockito.when(swap.getSwapUsed()).thenReturn(500L);
emitter.flush();
m.monitorSwapStats(emitter);
emitter.verifyValue("sys/swap/pageIn", 100L);
emitter.verifyValue("sys/swap/pageOut", 50L);
emitter.verifyValue("sys/swap/max", 1000L);
emitter.verifyValue("sys/swap/free", 500L);
m.stop();
}
@Test
public void testFsStats()
{
StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000");
FileSystem fileSystem = Mockito.mock(FileSystem.class);
OSFileStore fs1 = Mockito.mock(OSFileStore.class);
OSFileStore fs2 = Mockito.mock(OSFileStore.class);
Mockito.when(fs1.getTotalSpace()).thenReturn(300L);
Mockito.when(fs1.getUsableSpace()).thenReturn(200L);
Mockito.when(fs1.getTotalInodes()).thenReturn(1000L);
Mockito.when(fs1.getFreeInodes()).thenReturn(700L);
Mockito.when(fs1.getVolume()).thenReturn("/dev/disk1");
Mockito.when(fs1.getMount()).thenReturn("/System/Volumes/boot1");
Mockito.when(fs2.getTotalSpace()).thenReturn(400L);
Mockito.when(fs2.getUsableSpace()).thenReturn(320L);
Mockito.when(fs2.getTotalInodes()).thenReturn(800L);
Mockito.when(fs2.getFreeInodes()).thenReturn(600L);
Mockito.when(fs2.getVolume()).thenReturn("/dev/disk2");
Mockito.when(fs2.getMount()).thenReturn("/System/Volumes/boot2");
List<OSFileStore> osFileStores = ImmutableList.of(fs1, fs2);
Mockito.when(fileSystem.getFileStores(true)).thenReturn(osFileStores);
Mockito.when(os.getFileSystem()).thenReturn(fileSystem);
OshiSysMonitor m = new OshiSysMonitor(si);
m.start();
m.monitorFsStats(emitter);
Assert.assertEquals(8, emitter.getEvents().size());
emitter.verifyEmitted("sys/fs/max", 2);
emitter.verifyEmitted("sys/fs/used", 2);
emitter.verifyEmitted("sys/fs/files/count", 2);
emitter.verifyEmitted("sys/fs/files/free", 2);
Map<String, Object> userDims1 = ImmutableMap.of(
"fsDevName",
"/dev/disk1",
"fsDirName",
"/System/Volumes/boot1"
);
List<Number> metricValues1 = emitter.getMetricValues("sys/fs/max", userDims1);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(300L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/fs/used", userDims1);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(100L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/fs/files/count", userDims1);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(1000L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/fs/files/free", userDims1);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(700L, metricValues1.get(0));
Map<String, Object> userDims2 = ImmutableMap.of(
"fsDevName",
"/dev/disk2",
"fsDirName",
"/System/Volumes/boot2"
);
List<Number> metricValues2 = emitter.getMetricValues("sys/fs/max", userDims2);
Assert.assertEquals(1, metricValues2.size());
Assert.assertEquals(400L, metricValues2.get(0));
metricValues2 = emitter.getMetricValues("sys/fs/used", userDims2);
Assert.assertEquals(1, metricValues2.size());
Assert.assertEquals(80L, metricValues2.get(0));
metricValues2 = emitter.getMetricValues("sys/fs/files/count", userDims2);
Assert.assertEquals(1, metricValues2.size());
Assert.assertEquals(800L, metricValues2.get(0));
metricValues2 = emitter.getMetricValues("sys/fs/files/free", userDims2);
Assert.assertEquals(1, metricValues2.size());
Assert.assertEquals(600L, metricValues2.get(0));
m.stop();
}
@Test
public void testDiskStats()
{
StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000");
HWDiskStore disk1 = Mockito.mock(HWDiskStore.class);
HWDiskStore disk2 = Mockito.mock(HWDiskStore.class);
Mockito.when(disk1.getReadBytes()).thenReturn(300L);
Mockito.when(disk1.getReads()).thenReturn(200L);
Mockito.when(disk1.getWriteBytes()).thenReturn(400L);
Mockito.when(disk1.getWrites()).thenReturn(500L);
Mockito.when(disk1.getCurrentQueueLength()).thenReturn(100L);
Mockito.when(disk1.getTransferTime()).thenReturn(150L);
Mockito.when(disk1.getName()).thenReturn("disk1");
Mockito.when(disk2.getReadBytes()).thenReturn(2000L);
Mockito.when(disk2.getReads()).thenReturn(3000L);
Mockito.when(disk2.getWriteBytes()).thenReturn(1000L);
Mockito.when(disk2.getWrites()).thenReturn(4000L);
Mockito.when(disk2.getCurrentQueueLength()).thenReturn(750L);
Mockito.when(disk2.getTransferTime()).thenReturn(800L);
Mockito.when(disk2.getName()).thenReturn("disk2");
List<HWDiskStore> hwDiskStores = ImmutableList.of(disk1, disk2);
Mockito.when(hal.getDiskStores()).thenReturn(hwDiskStores);
OshiSysMonitor m = new OshiSysMonitor(si);
m.start();
m.monitorDiskStats(emitter);
Assert.assertEquals(0, emitter.getEvents().size());
Mockito.when(disk1.getReadBytes()).thenReturn(400L);
Mockito.when(disk1.getReads()).thenReturn(220L);
Mockito.when(disk1.getWriteBytes()).thenReturn(600L);
Mockito.when(disk1.getWrites()).thenReturn(580L);
Mockito.when(disk1.getCurrentQueueLength()).thenReturn(300L);
Mockito.when(disk1.getTransferTime()).thenReturn(250L);
Mockito.when(disk2.getReadBytes()).thenReturn(4500L);
Mockito.when(disk2.getReads()).thenReturn(3500L);
Mockito.when(disk2.getWriteBytes()).thenReturn(2300L);
Mockito.when(disk2.getWrites()).thenReturn(5000L);
Mockito.when(disk2.getCurrentQueueLength()).thenReturn(900L);
Mockito.when(disk2.getTransferTime()).thenReturn(1100L);
m.monitorDiskStats(emitter);
Assert.assertEquals(12, emitter.getEvents().size());
Map<String, Object> userDims1 = ImmutableMap.of(
"diskName",
"disk1"
);
List<Number> metricValues1 = emitter.getMetricValues("sys/disk/read/size", userDims1);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(100L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/disk/read/count", userDims1);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(20L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/disk/write/size", userDims1);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(200L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/disk/write/count", userDims1);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(80L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/disk/queue", userDims1);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(200L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/disk/transferTime", userDims1);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(100L, metricValues1.get(0));
Map<String, Object> userDims2 = ImmutableMap.of(
"diskName",
"disk2"
);
List<Number> metricValues2 = emitter.getMetricValues("sys/disk/read/size", userDims2);
Assert.assertEquals(1, metricValues2.size());
Assert.assertEquals(2500L, metricValues2.get(0));
metricValues2 = emitter.getMetricValues("sys/disk/read/count", userDims2);
Assert.assertEquals(1, metricValues2.size());
Assert.assertEquals(500L, metricValues2.get(0));
metricValues2 = emitter.getMetricValues("sys/disk/write/size", userDims2);
Assert.assertEquals(1, metricValues2.size());
Assert.assertEquals(1300L, metricValues2.get(0));
metricValues2 = emitter.getMetricValues("sys/disk/write/count", userDims2);
Assert.assertEquals(1, metricValues2.size());
Assert.assertEquals(1000L, metricValues2.get(0));
metricValues2 = emitter.getMetricValues("sys/disk/queue", userDims2);
Assert.assertEquals(1, metricValues2.size());
Assert.assertEquals(150L, metricValues2.get(0));
metricValues2 = emitter.getMetricValues("sys/disk/transferTime", userDims2);
Assert.assertEquals(1, metricValues2.size());
Assert.assertEquals(300L, metricValues2.get(0));
m.stop();
}
@Test
public void testNetStats()
{
StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000");
NetworkIF net1 = Mockito.mock(NetworkIF.class);
Mockito.when(net1.getBytesRecv()).thenReturn(300L);
Mockito.when(net1.getPacketsRecv()).thenReturn(200L);
Mockito.when(net1.getInErrors()).thenReturn(400L);
Mockito.when(net1.getInDrops()).thenReturn(500L);
Mockito.when(net1.getBytesSent()).thenReturn(100L);
Mockito.when(net1.getPacketsSent()).thenReturn(150L);
Mockito.when(net1.getOutErrors()).thenReturn(200L);
Mockito.when(net1.getCollisions()).thenReturn(20L);
Mockito.when(net1.getName()).thenReturn("Wifi");
Mockito.when(net1.getIPv4addr()).thenReturn(new String[]{"123.456.7.8", "0.0.0.0", "192.1.2.3"});
Mockito.when(net1.getMacaddr()).thenReturn("ha:rd:wa:re:add");
List<NetworkIF> networkIFS = ImmutableList.of(net1);
Mockito.when(hal.getNetworkIFs()).thenReturn(networkIFS);
OshiSysMonitor m = new OshiSysMonitor(si);
m.start();
m.monitorNetStats(emitter);
Assert.assertEquals(0, emitter.getEvents().size());
Mockito.when(net1.getBytesRecv()).thenReturn(400L);
Mockito.when(net1.getPacketsRecv()).thenReturn(220L);
Mockito.when(net1.getInErrors()).thenReturn(600L);
Mockito.when(net1.getInDrops()).thenReturn(580L);
Mockito.when(net1.getBytesSent()).thenReturn(300L);
Mockito.when(net1.getPacketsSent()).thenReturn(250L);
Mockito.when(net1.getOutErrors()).thenReturn(330L);
Mockito.when(net1.getCollisions()).thenReturn(240L);
m.monitorNetStats(emitter);
Assert.assertEquals(16, emitter.getEvents().size()); // 8 * 2 whitelisted ips
Map<String, Object> userDims1 = ImmutableMap.of(
"netName",
"Wifi",
"netAddress",
"123.456.7.8",
"netHwaddr",
"ha:rd:wa:re:add"
);
Map<String, Object> userDims2 = ImmutableMap.of(
"netName",
"Wifi",
"netAddress",
"192.1.2.3",
"netHwaddr",
"ha:rd:wa:re:add"
);
List<Number> metricValues1 = emitter.getMetricValues("sys/net/read/size", userDims1);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(100L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/net/read/packets", userDims1);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(20L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/net/read/errors", userDims1);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(200L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/net/read/dropped", userDims1);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(80L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/net/write/size", userDims1);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(200L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/net/write/packets", userDims1);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(100L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/net/write/errors", userDims1);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(130L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/net/write/collisions", userDims1);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(220L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/net/read/size", userDims2);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(100L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/net/read/packets", userDims2);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(20L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/net/read/errors", userDims2);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(200L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/net/read/dropped", userDims2);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(80L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/net/write/size", userDims2);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(200L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/net/write/packets", userDims2);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(100L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/net/write/errors", userDims2);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(130L, metricValues1.get(0));
metricValues1 = emitter.getMetricValues("sys/net/write/collisions", userDims2);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(220L, metricValues1.get(0));
m.stop();
}
@Test
public void testCpuStats()
{
StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000");
CentralProcessor processor = Mockito.mock(CentralProcessor.class);
long[][] procTicks = new long[][]{
{1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L},
{2L, 4L, 6L, 8L, 10L, 12L, 14L, 16L},
};
Mockito.when(processor.getProcessorCpuLoadTicks()).thenReturn(procTicks);
Mockito.when(hal.getProcessor()).thenReturn(processor);
OshiSysMonitor m = new OshiSysMonitor(si);
m.start();
m.monitorCpuStats(emitter);
Assert.assertEquals(0, emitter.getEvents().size());
long[][] procTicks2 = new long[][]{
{4L, 5L, 6L, 8L, 9L, 7L, 10L, 12L}, // Δtick1 {3,3,3,4,4,1,3,4} _total = 25, emitted percentage
{5L, 8L, 8L, 10L, 15L, 14L, 18L, 22L}, // Δtick2 {3,4,2,2,5,2,4,6} _total = 28
};
Mockito.when(processor.getProcessorCpuLoadTicks()).thenReturn(procTicks2);
m.monitorCpuStats(emitter);
m.stop();
Assert.assertEquals(16, emitter.getEvents().size()); // 8 ticktype * 2 processors
Map<String, Object> userDims = new HashMap<String, Object>();
userDims.put("cpuName", "0");
userDims.put("cpuTime", "user");
List<Number> metricValues1 = emitter.getMetricValues("sys/cpu", userDims);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(12L, metricValues1.get(0));
userDims.replace("cpuTime", "nice");
metricValues1 = emitter.getMetricValues("sys/cpu", userDims);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(12L, metricValues1.get(0));
userDims.replace("cpuTime", "sys");
metricValues1 = emitter.getMetricValues("sys/cpu", userDims);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(12L, metricValues1.get(0));
userDims.replace("cpuTime", "idle");
metricValues1 = emitter.getMetricValues("sys/cpu", userDims);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(16L, metricValues1.get(0));
userDims.replace("cpuTime", "wait");
metricValues1 = emitter.getMetricValues("sys/cpu", userDims);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(16L, metricValues1.get(0));
userDims.replace("cpuTime", "irq");
metricValues1 = emitter.getMetricValues("sys/cpu", userDims);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(4L, metricValues1.get(0));
userDims.replace("cpuTime", "softIrq");
metricValues1 = emitter.getMetricValues("sys/cpu", userDims);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(12L, metricValues1.get(0));
userDims.replace("cpuTime", "stolen");
metricValues1 = emitter.getMetricValues("sys/cpu", userDims);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(16L, metricValues1.get(0));
userDims.replace("cpuName", "1");
userDims.replace("cpuTime", "user");
metricValues1 = emitter.getMetricValues("sys/cpu", userDims);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(10L, metricValues1.get(0));
userDims.replace("cpuTime", "nice");
metricValues1 = emitter.getMetricValues("sys/cpu", userDims);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(14L, metricValues1.get(0));
userDims.replace("cpuTime", "sys");
metricValues1 = emitter.getMetricValues("sys/cpu", userDims);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(7L, metricValues1.get(0));
userDims.replace("cpuTime", "idle");
metricValues1 = emitter.getMetricValues("sys/cpu", userDims);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(7L, metricValues1.get(0));
userDims.replace("cpuTime", "wait");
metricValues1 = emitter.getMetricValues("sys/cpu", userDims);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(17L, metricValues1.get(0));
userDims.replace("cpuTime", "irq");
metricValues1 = emitter.getMetricValues("sys/cpu", userDims);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(7L, metricValues1.get(0));
userDims.replace("cpuTime", "softIrq");
metricValues1 = emitter.getMetricValues("sys/cpu", userDims);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(14L, metricValues1.get(0));
userDims.replace("cpuTime", "stolen");
metricValues1 = emitter.getMetricValues("sys/cpu", userDims);
Assert.assertEquals(1, metricValues1.size());
Assert.assertEquals(21L, metricValues1.get(0));
}
@Test
public void testSysStats()
{
StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000");
Mockito.when(os.getSystemUptime()).thenReturn(4000L);
CentralProcessor processor = Mockito.mock(CentralProcessor.class);
double[] la = new double[]{2.31, 4.31, 5.31};
Mockito.when(processor.getSystemLoadAverage(3)).thenReturn(la);
Mockito.when(hal.getProcessor()).thenReturn(processor);
OshiSysMonitor m = new OshiSysMonitor(si);
m.start();
m.monitorSysStats(emitter);
Assert.assertEquals(4, emitter.getEvents().size());
m.stop();
emitter.verifyEmitted("sys/uptime", 1);
emitter.verifyEmitted("sys/la/1", 1);
emitter.verifyEmitted("sys/la/5", 1);
emitter.verifyEmitted("sys/la/15", 1);
emitter.verifyValue("sys/uptime", 4000L);
emitter.verifyValue("sys/la/1", 2.31);
emitter.verifyValue("sys/la/5", 4.31);
emitter.verifyValue("sys/la/15", 5.31);
}
@Test
public void testTcpStats()
{
StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000");
InternetProtocolStats.TcpStats tcpv4 = Mockito.mock(InternetProtocolStats.TcpStats.class);
InternetProtocolStats ipstats = Mockito.mock(InternetProtocolStats.class);
Mockito.when(tcpv4.getConnectionsActive()).thenReturn(10L);
Mockito.when(tcpv4.getConnectionsPassive()).thenReturn(20L);
Mockito.when(tcpv4.getConnectionFailures()).thenReturn(5L);
Mockito.when(tcpv4.getConnectionsReset()).thenReturn(7L);
Mockito.when(tcpv4.getSegmentsReceived()).thenReturn(200L);
Mockito.when(tcpv4.getInErrors()).thenReturn(3L);
Mockito.when(tcpv4.getSegmentsSent()).thenReturn(300L);
Mockito.when(tcpv4.getOutResets()).thenReturn(4L);
Mockito.when(tcpv4.getSegmentsRetransmitted()).thenReturn(8L);
Mockito.when(ipstats.getTCPv4Stats()).thenReturn(tcpv4);
Mockito.when(os.getInternetProtocolStats()).thenReturn(ipstats);
OshiSysMonitor m = new OshiSysMonitor(si);
m.start();
m.monitorTcpStats(emitter);
Assert.assertEquals(0, emitter.getEvents().size());
Mockito.when(tcpv4.getConnectionsActive()).thenReturn(20L);
Mockito.when(tcpv4.getConnectionsPassive()).thenReturn(25L);
Mockito.when(tcpv4.getConnectionFailures()).thenReturn(8L);
Mockito.when(tcpv4.getConnectionsReset()).thenReturn(14L);
Mockito.when(tcpv4.getSegmentsReceived()).thenReturn(350L);
Mockito.when(tcpv4.getInErrors()).thenReturn(4L);
Mockito.when(tcpv4.getSegmentsSent()).thenReturn(500L);
Mockito.when(tcpv4.getOutResets()).thenReturn(7L);
Mockito.when(tcpv4.getSegmentsRetransmitted()).thenReturn(8L);
m.monitorTcpStats(emitter);
m.stop();
Assert.assertEquals(9, emitter.getEvents().size());
emitter.verifyValue("sys/tcpv4/activeOpens", 10L);
emitter.verifyValue("sys/tcpv4/passiveOpens", 5L);
emitter.verifyValue("sys/tcpv4/attemptFails", 3L);
emitter.verifyValue("sys/tcpv4/estabResets", 7L);
emitter.verifyValue("sys/tcpv4/in/segs", 150L);
emitter.verifyValue("sys/tcpv4/in/errs", 1L);
emitter.verifyValue("sys/tcpv4/out/segs", 200L);
emitter.verifyValue("sys/tcpv4/out/rsts", 3L);
emitter.verifyValue("sys/tcpv4/retrans/segs", 0L);
}
private void checkEvents(List<Event> events, String expectedFeed)
{
Assert.assertFalse("no events emitted", events.isEmpty());
for (Event e : events) {
if (!expectedFeed.equals(e.getFeed())) {
String message = StringUtils.format("\"feed\" in event: %s", e.toMap().toString());
Assert.assertEquals(message, expectedFeed, e.getFeed());
}
}
}
}

View File

@ -46,7 +46,9 @@ import org.apache.druid.java.util.metrics.JvmMonitor;
import org.apache.druid.java.util.metrics.JvmThreadsMonitor;
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.java.util.metrics.NoopOshiSysMonitor;
import org.apache.druid.java.util.metrics.NoopSysMonitor;
import org.apache.druid.java.util.metrics.OshiSysMonitor;
import org.apache.druid.java.util.metrics.SysMonitor;
import org.apache.druid.query.ExecutorServiceMonitor;
@ -192,4 +194,19 @@ public class MetricsModule implements Module
return new SysMonitor(dimensions);
}
}
@Provides
@ManageLifecycle
public OshiSysMonitor getOshiSysMonitor(DataSourceTaskIdHolder dataSourceTaskIdHolder, @Self Set<NodeRole> nodeRoles)
{
if (nodeRoles.contains(NodeRole.PEON)) {
return new NoopOshiSysMonitor();
} else {
Map<String, String[]> dimensions = MonitorsConfig.mapOfDatasourceAndTaskID(
dataSourceTaskIdHolder.getDataSource(),
dataSourceTaskIdHolder.getTaskId()
);
return new OshiSysMonitor(dimensions);
}
}
}

View File

@ -44,7 +44,9 @@ import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.metrics.BasicMonitorScheduler;
import org.apache.druid.java.util.metrics.ClockDriftSafeMonitorScheduler;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.java.util.metrics.NoopOshiSysMonitor;
import org.apache.druid.java.util.metrics.NoopSysMonitor;
import org.apache.druid.java.util.metrics.OshiSysMonitor;
import org.apache.druid.java.util.metrics.SysMonitor;
import org.apache.druid.server.DruidNode;
import org.hamcrest.CoreMatchers;
@ -198,6 +200,30 @@ public class MetricsModuleTest
Assert.assertFalse(sysMonitor instanceof NoopSysMonitor);
Mockito.verify(emitter, Mockito.atLeastOnce()).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
}
@Test
public void testGetOshiSysMonitorViaInjector()
{
final Injector injector = createInjector(new Properties(), ImmutableSet.of(NodeRole.PEON));
final OshiSysMonitor sysMonitor = injector.getInstance(OshiSysMonitor.class);
final ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
sysMonitor.doMonitor(emitter);
Assert.assertTrue(sysMonitor instanceof NoopOshiSysMonitor);
Mockito.verify(emitter, Mockito.never()).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
}
@Test
public void testGetOshiSysMonitorWhenNull()
{
Injector injector = createInjector(new Properties(), ImmutableSet.of());
final OshiSysMonitor sysMonitor = injector.getInstance(OshiSysMonitor.class);
final ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
sysMonitor.doMonitor(emitter);
Assert.assertFalse(sysMonitor instanceof NoopOshiSysMonitor);
Mockito.verify(emitter, Mockito.atLeastOnce()).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
}
private static Injector createInjector(Properties properties, ImmutableSet<NodeRole> nodeRoles)
{