[BAEL-5012] Intro to ksqlDB (#11113)
* [BAEL-5012] Intro to ksqlDB * [BAEL-5012] Fix POM file and code cleanup * [BAEL-5012] Code cleanup
This commit is contained in:
parent
179b9ef2fd
commit
8900c1ebba
|
@ -0,0 +1,74 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<artifactId>ksqldb-app</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
<name>ksqldb</name>
|
||||||
|
|
||||||
|
<parent>
|
||||||
|
<groupId>com.baeldung</groupId>
|
||||||
|
<artifactId>parent-modules</artifactId>
|
||||||
|
<version>1.0.0-SNAPSHOT</version>
|
||||||
|
<relativePath>../pom.xml</relativePath>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<repositories>
|
||||||
|
<repository>
|
||||||
|
<id>confluent</id>
|
||||||
|
<name>confluent-repo</name>
|
||||||
|
<url>http://packages.confluent.io/maven/</url>
|
||||||
|
</repository>
|
||||||
|
</repositories>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.confluent.ksql</groupId>
|
||||||
|
<artifactId>ksqldb-api-client</artifactId>
|
||||||
|
<version>${ksqldb.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.projectlombok</groupId>
|
||||||
|
<artifactId>lombok</artifactId>
|
||||||
|
<version>${lombok.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.awaitility</groupId>
|
||||||
|
<artifactId>awaitility</artifactId>
|
||||||
|
<version>${awaitility.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.assertj</groupId>
|
||||||
|
<artifactId>assertj-core</artifactId>
|
||||||
|
<version>${assertj.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.testcontainers</groupId>
|
||||||
|
<artifactId>testcontainers</artifactId>
|
||||||
|
<version>${testcontainers.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.testcontainers</groupId>
|
||||||
|
<artifactId>junit-jupiter</artifactId>
|
||||||
|
<version>${testcontainers.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<ksqldb.version>6.2.0</ksqldb.version>
|
||||||
|
<assertj.version>3.20.2</assertj.version>
|
||||||
|
<awaitility.version>4.1.0</awaitility.version>
|
||||||
|
<testcontainers.version>1.15.3</testcontainers.version>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
</project>
|
|
@ -0,0 +1,29 @@
|
||||||
|
package com.baeldung.ksqldb;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@Builder
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||||
|
public class Alert {
|
||||||
|
|
||||||
|
@JsonProperty(value = "SENSOR_ID")
|
||||||
|
private String sensorId;
|
||||||
|
|
||||||
|
@JsonProperty(value = "START_PERIOD")
|
||||||
|
private String startPeriod;
|
||||||
|
|
||||||
|
@JsonProperty(value = "END_PERIOD")
|
||||||
|
private String endPeriod;
|
||||||
|
|
||||||
|
@JsonProperty(value = "AVERAGE_READING")
|
||||||
|
private double averageReading;
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,75 @@
|
||||||
|
package com.baeldung.ksqldb;
|
||||||
|
|
||||||
|
import io.confluent.ksql.api.client.Client;
|
||||||
|
import io.confluent.ksql.api.client.ExecuteStatementResult;
|
||||||
|
import io.confluent.ksql.api.client.KsqlObject;
|
||||||
|
import io.confluent.ksql.api.client.Row;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.reactivestreams.Subscriber;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
|
@AllArgsConstructor
|
||||||
|
@Slf4j
|
||||||
|
public class KsqlDBApplication {
|
||||||
|
|
||||||
|
private static final String CREATE_READINGS_STREAM = ""
|
||||||
|
+ " CREATE STREAM readings (sensor_id VARCHAR KEY, timestamp VARCHAR, reading INT)"
|
||||||
|
+ " WITH (KAFKA_TOPIC = 'readings',"
|
||||||
|
+ " VALUE_FORMAT = 'JSON',"
|
||||||
|
+ " TIMESTAMP = 'timestamp',"
|
||||||
|
+ " TIMESTAMP_FORMAT = 'yyyy-MM-dd HH:mm:ss',"
|
||||||
|
+ " PARTITIONS = 1);";
|
||||||
|
|
||||||
|
private static final String CREATE_ALERTS_TABLE = ""
|
||||||
|
+ " CREATE TABLE alerts AS"
|
||||||
|
+ " SELECT"
|
||||||
|
+ " sensor_id,"
|
||||||
|
+ " TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss', 'UTC') AS start_period,"
|
||||||
|
+ " TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss', 'UTC') AS end_period,"
|
||||||
|
+ " AVG(reading) AS average_reading"
|
||||||
|
+ " FROM readings"
|
||||||
|
+ " WINDOW TUMBLING (SIZE 30 MINUTES)"
|
||||||
|
+ " GROUP BY sensor_id"
|
||||||
|
+ " HAVING AVG(reading) > 25"
|
||||||
|
+ " EMIT CHANGES;";
|
||||||
|
|
||||||
|
private static final String ALERTS_QUERY = "SELECT * FROM alerts EMIT CHANGES;";
|
||||||
|
|
||||||
|
private static final String READINGS_STREAM = "readings";
|
||||||
|
|
||||||
|
private static final Map<String, Object> PROPERTIES = Collections.singletonMap("auto.offset.reset", "earliest");
|
||||||
|
|
||||||
|
private final Client client;
|
||||||
|
|
||||||
|
public CompletableFuture<ExecuteStatementResult> createReadingsStream() {
|
||||||
|
return client.executeStatement(CREATE_READINGS_STREAM, PROPERTIES);
|
||||||
|
}
|
||||||
|
|
||||||
|
public CompletableFuture<ExecuteStatementResult> createAlertsTable() {
|
||||||
|
return client.executeStatement(CREATE_ALERTS_TABLE, PROPERTIES);
|
||||||
|
}
|
||||||
|
|
||||||
|
public CompletableFuture<Void> insert(Collection<KsqlObject> rows) {
|
||||||
|
return CompletableFuture.allOf(
|
||||||
|
rows.stream()
|
||||||
|
.map(row -> client.insertInto(READINGS_STREAM, row))
|
||||||
|
.toArray(CompletableFuture[]::new)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public CompletableFuture<Void> subscribeOnAlerts(Subscriber<Row> subscriber) {
|
||||||
|
return client.streamQuery(ALERTS_QUERY, PROPERTIES)
|
||||||
|
.thenAccept(streamedQueryResult -> streamedQueryResult.subscribe(subscriber))
|
||||||
|
.whenComplete((result, ex) -> {
|
||||||
|
if (ex != null) {
|
||||||
|
log.error("Alerts push query failed", ex);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,10 @@
|
||||||
|
package com.baeldung.ksqldb;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public class Reading {
|
||||||
|
private String id;
|
||||||
|
private String timestamp;
|
||||||
|
private int reading;
|
||||||
|
}
|
|
@ -0,0 +1,60 @@
|
||||||
|
package com.baeldung.ksqldb;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import io.confluent.ksql.api.client.Row;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.reactivestreams.Subscriber;
|
||||||
|
import org.reactivestreams.Subscription;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class RowSubscriber<T> implements Subscriber<Row> {
|
||||||
|
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private final Class<T> clazz;
|
||||||
|
|
||||||
|
private Subscription subscription;
|
||||||
|
|
||||||
|
public List<T> consumedItems = new ArrayList<>();
|
||||||
|
|
||||||
|
public RowSubscriber(Class<T> clazz) {
|
||||||
|
this.clazz = clazz;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void onSubscribe(Subscription subscription) {
|
||||||
|
log.info("Subscriber is subscribed.");
|
||||||
|
this.subscription = subscription;
|
||||||
|
subscription.request(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void onNext(Row row) {
|
||||||
|
String jsonString = row.asObject().toJsonString();
|
||||||
|
log.info("Row JSON: {}", jsonString);
|
||||||
|
try {
|
||||||
|
T item = OBJECT_MAPPER.readValue(jsonString, this.clazz);
|
||||||
|
log.info("Item: {}", item);
|
||||||
|
consumedItems.add(item);
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
log.error("Unable to parse json", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Request the next row
|
||||||
|
subscription.request(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void onError(Throwable t) {
|
||||||
|
log.error("Received an error", t);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void onComplete() {
|
||||||
|
log.info("Query has ended.");
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,160 @@
|
||||||
|
package com.baeldung.ksqldb;
|
||||||
|
|
||||||
|
import io.confluent.ksql.api.client.Client;
|
||||||
|
import io.confluent.ksql.api.client.ClientOptions;
|
||||||
|
import io.confluent.ksql.api.client.KsqlObject;
|
||||||
|
import io.confluent.ksql.api.client.QueryInfo;
|
||||||
|
import io.confluent.ksql.api.client.QueryInfo.QueryType;
|
||||||
|
import io.confluent.ksql.api.client.Row;
|
||||||
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.testcontainers.containers.DockerComposeContainer;
|
||||||
|
import org.testcontainers.containers.wait.strategy.Wait;
|
||||||
|
import org.testcontainers.junit.jupiter.Container;
|
||||||
|
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.awaitility.Awaitility.await;
|
||||||
|
import static org.awaitility.Awaitility.given;
|
||||||
|
|
||||||
|
@Testcontainers
|
||||||
|
class KsqlDBApplicationLiveTest {
|
||||||
|
|
||||||
|
private static final File KSQLDB_COMPOSE_FILE = new File("src/test/resources/docker/docker-compose.yml");
|
||||||
|
|
||||||
|
private static final Map<String, Object> PROPERTIES = Collections.singletonMap("auto.offset.reset", "earliest");
|
||||||
|
|
||||||
|
private static final String KSQLDB_SERVER_HOST = "localhost";
|
||||||
|
private static final int KSQLDB_SERVER_PORT = 8088;
|
||||||
|
|
||||||
|
@Container
|
||||||
|
public static DockerComposeContainer dockerComposeContainer =
|
||||||
|
new DockerComposeContainer<>(KSQLDB_COMPOSE_FILE)
|
||||||
|
.withServices("zookeeper", "broker", "ksqldb-server")
|
||||||
|
.withExposedService("ksqldb-server", 8088,
|
||||||
|
Wait.forHealthcheck().withStartupTimeout(Duration.ofMinutes(5)))
|
||||||
|
.withLocalCompose(true);
|
||||||
|
|
||||||
|
private KsqlDBApplication ksqlDBApplication;
|
||||||
|
|
||||||
|
private Client client;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setup() {
|
||||||
|
ClientOptions options = ClientOptions.create()
|
||||||
|
.setHost(KSQLDB_SERVER_HOST)
|
||||||
|
.setPort(KSQLDB_SERVER_PORT);
|
||||||
|
client = Client.create(options);
|
||||||
|
|
||||||
|
ksqlDBApplication = new KsqlDBApplication(client);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterEach
|
||||||
|
void tearDown() {
|
||||||
|
deleteAlerts();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenSensorReadings_whenSubscribedToAlerts_thenAlertsAreConsumed() {
|
||||||
|
createAlertsMaterializedView();
|
||||||
|
RowSubscriber<Alert> alertSubscriber = new RowSubscriber<>(Alert.class);
|
||||||
|
|
||||||
|
CompletableFuture<Void> result = ksqlDBApplication.subscribeOnAlerts(alertSubscriber);
|
||||||
|
insertSampleData();
|
||||||
|
|
||||||
|
assertThat(result).isNotNull();
|
||||||
|
await().atMost(Duration.ofMinutes(3)).untilAsserted(() ->
|
||||||
|
assertThat(alertSubscriber.consumedItems)
|
||||||
|
.containsOnly(
|
||||||
|
expectedAlert("sensor-1", "2021-08-01 09:30:00", "2021-08-01 10:00:00", 28.0),
|
||||||
|
expectedAlert("sensor-2", "2021-08-01 10:00:00", "2021-08-01 10:30:00", 26.0)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenSensorReadings_whenPullQueryForRow_thenRowIsReturned() {
|
||||||
|
createAlertsMaterializedView();
|
||||||
|
insertSampleData();
|
||||||
|
|
||||||
|
String pullQuery = "SELECT * FROM alerts WHERE sensor_id = 'sensor-2';";
|
||||||
|
|
||||||
|
given().ignoreExceptions()
|
||||||
|
.await().atMost(Duration.ofMinutes(1))
|
||||||
|
.untilAsserted(() -> {
|
||||||
|
// it may be possible that the materialized view is not updated with sample data yet
|
||||||
|
// so ignore TimeoutException and try again
|
||||||
|
List<Row> rows = client.executeQuery(pullQuery, PROPERTIES)
|
||||||
|
.get(10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
assertThat(rows).hasSize(1);
|
||||||
|
|
||||||
|
Row row = rows.get(0);
|
||||||
|
assertThat(row.getString("SENSOR_ID")).isEqualTo("sensor-2");
|
||||||
|
assertThat(row.getString("START_PERIOD")).isEqualTo("2021-08-01 10:00:00");
|
||||||
|
assertThat(row.getString("END_PERIOD")).isEqualTo("2021-08-01 10:30:00");
|
||||||
|
assertThat(row.getDouble("AVERAGE_READING")).isEqualTo(26.0);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createAlertsMaterializedView() {
|
||||||
|
ksqlDBApplication.createReadingsStream().join();
|
||||||
|
ksqlDBApplication.createAlertsTable().join();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void insertSampleData() {
|
||||||
|
ksqlDBApplication.insert(
|
||||||
|
Arrays.asList(
|
||||||
|
new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:00:00").put("reading", 22),
|
||||||
|
new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:10:00").put("reading", 20),
|
||||||
|
new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:20:00").put("reading", 20),
|
||||||
|
|
||||||
|
// these reading will exceed the alert threshold (sensor-1)
|
||||||
|
new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:30:00").put("reading", 24),
|
||||||
|
new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:40:00").put("reading", 30),
|
||||||
|
new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:50:00").put("reading", 30),
|
||||||
|
|
||||||
|
new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 10:00:00").put("reading", 24),
|
||||||
|
|
||||||
|
// these reading will exceed the alert threshold (sensor-2)
|
||||||
|
new KsqlObject().put("sensor_id", "sensor-2").put("timestamp", "2021-08-01 10:00:00").put("reading", 26),
|
||||||
|
new KsqlObject().put("sensor_id", "sensor-2").put("timestamp", "2021-08-01 10:10:00").put("reading", 26),
|
||||||
|
new KsqlObject().put("sensor_id", "sensor-2").put("timestamp", "2021-08-01 10:20:00").put("reading", 26),
|
||||||
|
|
||||||
|
new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 10:30:00").put("reading", 24)
|
||||||
|
)
|
||||||
|
).join();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteAlerts() {
|
||||||
|
client.listQueries()
|
||||||
|
.thenApply(queryInfos -> queryInfos.stream()
|
||||||
|
.filter(queryInfo -> queryInfo.getQueryType() == QueryType.PERSISTENT)
|
||||||
|
.map(QueryInfo::getId)
|
||||||
|
.findFirst()
|
||||||
|
.orElseThrow(() -> new RuntimeException("Persistent query not found")))
|
||||||
|
.thenCompose(id -> client.executeStatement("TERMINATE " + id + ";"))
|
||||||
|
.thenCompose(result -> client.executeStatement("DROP TABLE alerts DELETE TOPIC;"))
|
||||||
|
.thenCompose(result -> client.executeStatement("DROP STREAM readings DELETE TOPIC;"))
|
||||||
|
.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Alert expectedAlert(String sensorId, String startPeriod, String endPeriod, double average) {
|
||||||
|
return Alert.builder()
|
||||||
|
.sensorId(sensorId)
|
||||||
|
.startPeriod(startPeriod)
|
||||||
|
.endPeriod(endPeriod)
|
||||||
|
.averageReading(average)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,49 @@
|
||||||
|
---
|
||||||
|
version: '3'
|
||||||
|
|
||||||
|
services:
|
||||||
|
zookeeper:
|
||||||
|
image: confluentinc/cp-zookeeper:6.2.0
|
||||||
|
hostname: zookeeper
|
||||||
|
environment:
|
||||||
|
ZOOKEEPER_CLIENT_PORT: 2181
|
||||||
|
ZOOKEEPER_TICK_TIME: 2000
|
||||||
|
|
||||||
|
broker:
|
||||||
|
image: confluentinc/cp-kafka:6.2.0
|
||||||
|
hostname: broker
|
||||||
|
depends_on:
|
||||||
|
- zookeeper
|
||||||
|
environment:
|
||||||
|
KAFKA_BROKER_ID: 1
|
||||||
|
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
|
||||||
|
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
|
||||||
|
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
|
||||||
|
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||||
|
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
|
||||||
|
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
|
||||||
|
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
|
||||||
|
|
||||||
|
ksqldb-server:
|
||||||
|
image: confluentinc/ksqldb-server:0.19.0
|
||||||
|
hostname: ksqldb-server
|
||||||
|
depends_on:
|
||||||
|
- broker
|
||||||
|
ports:
|
||||||
|
- "8088:8088"
|
||||||
|
healthcheck:
|
||||||
|
test: curl -f http://ksqldb-server:8088/ || exit 1
|
||||||
|
environment:
|
||||||
|
KSQL_LISTENERS: http://0.0.0.0:8088
|
||||||
|
KSQL_BOOTSTRAP_SERVERS: broker:9092
|
||||||
|
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
|
||||||
|
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
|
||||||
|
|
||||||
|
ksqldb-cli:
|
||||||
|
image: confluentinc/ksqldb-cli:0.19.0
|
||||||
|
hostname: ksqldb-cli
|
||||||
|
depends_on:
|
||||||
|
- broker
|
||||||
|
- ksqldb-server
|
||||||
|
entrypoint: /bin/sh
|
||||||
|
tty: true
|
|
@ -0,0 +1,6 @@
|
||||||
|
log4j.rootLogger=INFO, stdout
|
||||||
|
|
||||||
|
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||||
|
log4j.appender.stdout.Target=System.out
|
||||||
|
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||||
|
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1} - %m%n
|
3
pom.xml
3
pom.xml
|
@ -472,6 +472,7 @@
|
||||||
<module>jsoup</module>
|
<module>jsoup</module>
|
||||||
<module>jta</module>
|
<module>jta</module>
|
||||||
<module>kubernetes</module>
|
<module>kubernetes</module>
|
||||||
|
<module>ksqldb</module>
|
||||||
<!-- <module>lagom</module> --> <!-- Not a maven project -->
|
<!-- <module>lagom</module> --> <!-- Not a maven project -->
|
||||||
<module>language-interop</module>
|
<module>language-interop</module>
|
||||||
<module>libraries-2</module>
|
<module>libraries-2</module>
|
||||||
|
@ -940,6 +941,8 @@
|
||||||
<module>jsoup</module>
|
<module>jsoup</module>
|
||||||
<module>jta</module>
|
<module>jta</module>
|
||||||
|
|
||||||
|
<module>ksqldb</module>
|
||||||
|
|
||||||
<!-- <module>lagom</module> --> <!-- Not a maven project -->
|
<!-- <module>lagom</module> --> <!-- Not a maven project -->
|
||||||
<module>libraries-2</module>
|
<module>libraries-2</module>
|
||||||
<module>libraries-3</module>
|
<module>libraries-3</module>
|
||||||
|
|
Loading…
Reference in New Issue