BAEL-1420 - introduction to InfluxDB with Java (#3276)
* BAEL-1420 - introduction to InfluxDB with Java * BAEL-1420 - removed unused catches. Shorten batch timer. * BAEL-1420 - add notes to readme.
This commit is contained in:
parent
4f0ae5d2e3
commit
0d22a64cbf
|
@ -0,0 +1,17 @@
|
|||
## Influx SDK Tutorial Project
|
||||
|
||||
### Relevant Article:
|
||||
- [Introduction to using InfluxDB with Java](http://www.baeldung.com/using-influxdb-with-java/)
|
||||
|
||||
### Overview
|
||||
This Maven project contains the Java code for the article linked above.
|
||||
|
||||
### Package Organization
|
||||
Java classes for the intro tutorial are in the
|
||||
org.baeldung.influxdb package.
|
||||
|
||||
|
||||
### Running the tests
|
||||
The test class expects an InfluxDB server to be available on localhost, at the default port of 8086 and with the default "admin" credentials.
|
||||
|
||||
```
|
|
@ -0,0 +1,44 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>influxdb</artifactId>
|
||||
<version>0.1-SNAPSHOT</version>
|
||||
<packaging>jar</packaging>
|
||||
<name>influxdb</name>
|
||||
<description>InfluxDB SDK Tutorial</description>
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>parent-modules</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.influxdb</groupId>
|
||||
<artifactId>influxdb-java</artifactId>
|
||||
<version>${influxdb.sdk.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<!-- Check for the most recent available version: https://projectlombok.org/changelog.html -->
|
||||
<version>${lombok.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<influxdb.sdk.version>2.8</influxdb.sdk.version>
|
||||
<lombok.version>1.16.18</lombok.version>
|
||||
|
||||
</properties>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,28 @@
|
|||
package com.baeldung.influxdb;
|
||||
|
||||
import lombok.Data;
|
||||
import org.influxdb.annotation.Column;
|
||||
import org.influxdb.annotation.Measurement;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
@Data
|
||||
@Measurement(name = "memory")
|
||||
public class MemoryPoint {
|
||||
|
||||
@Column(name = "time")
|
||||
private Instant time;
|
||||
|
||||
@Column(name = "name")
|
||||
private String name;
|
||||
|
||||
@Column(name = "free")
|
||||
private Long free;
|
||||
|
||||
@Column(name = "used")
|
||||
private Long used;
|
||||
|
||||
@Column(name = "buffer")
|
||||
private Long buffer;
|
||||
|
||||
}
|
|
@ -0,0 +1,179 @@
|
|||
package com.baeldung.influxdb;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.influxdb.InfluxDB;
|
||||
import org.influxdb.InfluxDBFactory;
|
||||
import org.influxdb.InfluxDBIOException;
|
||||
import org.influxdb.dto.*;
|
||||
import org.influxdb.impl.InfluxDBResultMapper;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@Slf4j
|
||||
public class InfluxDBConnectionLiveTest {
|
||||
|
||||
@Test
|
||||
public void whenCorrectInfoDatabaseConnects() {
|
||||
|
||||
InfluxDB connection = connectDatabase();
|
||||
assertTrue(pingServer(connection));
|
||||
}
|
||||
|
||||
private InfluxDB connectDatabase() {
|
||||
|
||||
// Connect to database assumed on localhost with default credentials.
|
||||
return InfluxDBFactory.connect("http://127.0.0.1:8086", "admin", "admin");
|
||||
|
||||
}
|
||||
|
||||
private boolean pingServer(InfluxDB influxDB) {
|
||||
try {
|
||||
// Ping and check for version string
|
||||
Pong response = influxDB.ping();
|
||||
if (response.getVersion().equalsIgnoreCase("unknown")) {
|
||||
log.error("Error pinging server.");
|
||||
return false;
|
||||
} else {
|
||||
log.info("Database version: {}", response.getVersion());
|
||||
return true;
|
||||
}
|
||||
} catch (InfluxDBIOException idbo) {
|
||||
log.error("Exception while pinging database: ", idbo);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenDatabaseCreatedDatabaseChecksOk() {
|
||||
|
||||
InfluxDB connection = connectDatabase();
|
||||
|
||||
// Create "baeldung and check for it
|
||||
connection.createDatabase("baeldung");
|
||||
assertTrue(connection.databaseExists("baeldung"));
|
||||
|
||||
// Verify that nonsense databases are not there
|
||||
assertFalse(connection.databaseExists("foobar"));
|
||||
|
||||
// Drop "baeldung" and check again
|
||||
connection.deleteDatabase("baeldung");
|
||||
assertFalse(connection.databaseExists("baeldung"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenPointsWrittenPointsExists() throws Exception {
|
||||
|
||||
InfluxDB connection = connectDatabase();
|
||||
|
||||
String dbName = "baeldung";
|
||||
connection.createDatabase(dbName);
|
||||
|
||||
// Need a retention policy before we can proceed
|
||||
connection.createRetentionPolicy("defaultPolicy", "baeldung", "30d", 1, true);
|
||||
|
||||
// Since we are doing a batch thread, we need to set this as a default
|
||||
connection.setRetentionPolicy("defaultPolicy");
|
||||
|
||||
// Enable batch mode
|
||||
connection.enableBatch(10, 10, TimeUnit.MILLISECONDS);
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Point point = Point.measurement("memory")
|
||||
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
|
||||
.addField("name", "server1")
|
||||
.addField("free", 4743656L)
|
||||
.addField("used", 1015096L)
|
||||
.addField("buffer", 1010467L)
|
||||
.build();
|
||||
|
||||
connection.write(dbName, "defaultPolicy", point);
|
||||
Thread.sleep(2);
|
||||
|
||||
}
|
||||
|
||||
// Unfortunately, the sleep inside the loop doesn't always add enough time to insure
|
||||
// that Influx's batch thread flushes all of the writes and this sometimes fails without
|
||||
// another brief pause.
|
||||
Thread.sleep(10);
|
||||
|
||||
List<MemoryPoint> memoryPointList = getPoints(connection, "Select * from memory", "baeldung");
|
||||
|
||||
assertEquals(10, memoryPointList.size());
|
||||
|
||||
// Turn off batch and clean up
|
||||
connection.disableBatch();
|
||||
connection.deleteDatabase("baeldung");
|
||||
connection.close();
|
||||
|
||||
}
|
||||
|
||||
private List<MemoryPoint> getPoints(InfluxDB connection, String query, String databaseName) {
|
||||
|
||||
// Run the query
|
||||
Query queryObject = new Query(query, databaseName);
|
||||
QueryResult queryResult = connection.query(queryObject);
|
||||
|
||||
// Map it
|
||||
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
|
||||
return resultMapper.toPOJO(queryResult, MemoryPoint.class);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void whenBatchWrittenBatchExists() {
|
||||
|
||||
InfluxDB connection = connectDatabase();
|
||||
|
||||
String dbName = "baeldung";
|
||||
connection.createDatabase(dbName);
|
||||
|
||||
// Need a retention policy before we can proceed
|
||||
// Since we are doing batches, we need not set it
|
||||
connection.createRetentionPolicy("defaultPolicy", "baeldung", "30d", 1, true);
|
||||
|
||||
|
||||
BatchPoints batchPoints = BatchPoints
|
||||
.database(dbName)
|
||||
.retentionPolicy("defaultPolicy")
|
||||
.build();
|
||||
Point point1 = Point.measurement("memory")
|
||||
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
|
||||
.addField("free", 4743656L)
|
||||
.addField("used", 1015096L)
|
||||
.addField("buffer", 1010467L)
|
||||
.build();
|
||||
Point point2 = Point.measurement("memory")
|
||||
.time(System.currentTimeMillis() - 100, TimeUnit.MILLISECONDS)
|
||||
.addField("free", 4743696L)
|
||||
.addField("used", 1016096L)
|
||||
.addField("buffer", 1008467L)
|
||||
.build();
|
||||
batchPoints.point(point1);
|
||||
batchPoints.point(point2);
|
||||
connection.write(batchPoints);
|
||||
|
||||
List<MemoryPoint> memoryPointList = getPoints(connection, "Select * from memory", "baeldung");
|
||||
|
||||
assertEquals(2, memoryPointList.size());
|
||||
assertTrue(4743696L == memoryPointList.get(0).getFree());
|
||||
|
||||
|
||||
memoryPointList = getPoints(connection, "Select * from memory order by time desc", "baeldung");
|
||||
|
||||
assertEquals(2, memoryPointList.size());
|
||||
assertTrue(4743656L == memoryPointList.get(0).getFree());
|
||||
|
||||
// Clean up database
|
||||
connection.deleteDatabase("baeldung");
|
||||
connection.close();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration>
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>web - %date [%thread] %-5level %logger{36} - %message%n
|
||||
</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<root level="INFO">
|
||||
<appender-ref ref="STDOUT" />
|
||||
</root>
|
||||
</configuration>
|
Loading…
Reference in New Issue