BAEL-4982 Batching In Cassandra (#11574)

* Implemented cassandra batch query

* Added netty version param

* Reformatted correctly

* Reformatted correctly

* Reformatted correctly

* Formatting fix resolved

* Formatting fix resolved

* Removed unused method

* Refactored method for better readability

* tab spaces corrected

Co-authored-by: saikat chakraborty <saikat.chakraborty@tesco.com>
This commit is contained in:
Saikat Chakraborty 2022-01-20 19:10:00 +05:30 committed by GitHub
parent 5a3494c262
commit 49319e7eb0
7 changed files with 525 additions and 1 deletions

View File

@ -43,6 +43,11 @@
<artifactId>java-driver-query-builder</artifactId>
<version>${datastax-cassandra.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty-transport-version}</version>
</dependency>
</dependencies>
<properties>
@ -50,6 +55,7 @@
<cassandra-driver-core.version>3.1.2</cassandra-driver-core.version>
<cassandra-unit.version>3.1.1.0</cassandra-unit.version>
<datastax-cassandra.version>4.1.0</datastax-cassandra.version>
<netty-transport-version>4.1.71.Final</netty-transport-version>
</properties>
</project>
</project>

View File

@ -0,0 +1,60 @@
package com.baeldung.cassandra.batch;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.baeldung.cassandra.batch.domain.Product;
import com.baeldung.cassandra.batch.repository.KeyspaceRepository;
import com.baeldung.cassandra.batch.repository.ProductRepository;
import com.datastax.oss.driver.api.core.CqlSession;
public class Application {
private static final Logger LOG = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
new Application().run();
}
public void run() {
CassandraConnector connector = new CassandraConnector();
connector.connect("127.0.0.1", 9042, "datacenter1");
CqlSession session = connector.getSession();
KeyspaceRepository keyspaceRepository = new KeyspaceRepository(session);
keyspaceRepository.createKeyspace("testKeyspace", 1);
keyspaceRepository.useKeyspace("testKeyspace");
ProductRepository productRepository = new ProductRepository(session);
productRepository.createProductTable("testKeyspace");
productRepository.createProductByIdTable("testKeyspace");
productRepository.createProductByIdTable("testKeyspace");
Product product = getProduct();
productRepository.insertProductBatch(product);
Product productV1 = getProduct();
Product productV2 = getProduct();
productRepository.insertProductVariantBatch(productV1, productV2);
List<Product> products = productRepository.selectAllProduct("testKeyspace");
products.forEach(x -> LOG.info(x.toString()));
connector.close();
}
private Product getProduct() {
Product product = new Product();
product.setProductName("Banana");
product.setDescription("Banana");
product.setPrice(12f);
return product;
}
}

View File

@ -0,0 +1,31 @@
package com.baeldung.cassandra.batch;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import java.net.InetSocketAddress;
import org.apache.commons.lang3.StringUtils;
public class CassandraConnector {
private CqlSession session;
public void connect(final String node, final Integer port, final String dataCenter) {
CqlSessionBuilder builder = CqlSession.builder();
builder.addContactPoint(new InetSocketAddress(node, port));
if (StringUtils.isNotBlank(dataCenter)) {
builder.withLocalDatacenter(dataCenter);
}
session = builder.build();
}
public CqlSession getSession() {
return this.session;
}
public void close() {
session.close();
}
}

View File

@ -0,0 +1,79 @@
package com.baeldung.cassandra.batch.domain;
import java.util.UUID;
public class Product {
public Product() {
super();
}
public Product(UUID productId, UUID variantId, String productName, String description, float price) {
super();
this.productId = productId;
this.variantId = variantId;
this.productName = productName;
this.description = description;
this.price = price;
}
public Product(UUID productId, String productName, String description, float price) {
super();
this.productId = productId;
this.productName = productName;
this.description = description;
this.price = price;
}
private UUID productId;
private UUID variantId;
private String productName;
private String description;
private float price;
public UUID getProductId() {
return productId;
}
public void setProductId(UUID productId) {
this.productId = productId;
}
public UUID getVariantId() {
return variantId;
}
public void setVariantId(UUID variantId) {
this.variantId = variantId;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public float getPrice() {
return price;
}
public void setPrice(float price) {
this.price = price;
}
@Override
public String toString() {
return "Product [productId=" + productId + ", variantId=" + variantId + ", productName=" + productName
+ ", description=" + description + ", price=" + price + "]";
}
}

View File

@ -0,0 +1,27 @@
package com.baeldung.cassandra.batch.repository;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import com.datastax.oss.driver.api.querybuilder.schema.CreateKeyspace;
public class KeyspaceRepository {
private final CqlSession session;
public KeyspaceRepository(CqlSession session) {
this.session = session;
}
public void createKeyspace(String keyspaceName, int numberOfReplicas) {
CreateKeyspace createKeyspace = SchemaBuilder.createKeyspace(keyspaceName)
.ifNotExists()
.withSimpleStrategy(numberOfReplicas);
session.execute(createKeyspace.build());
}
public void useKeyspace(String keyspace) {
session.execute("USE " + CqlIdentifier.fromCql(keyspace));
}
}

View File

@ -0,0 +1,188 @@
package com.baeldung.cassandra.batch.repository;
import com.baeldung.cassandra.batch.domain.Product;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import com.datastax.oss.driver.api.querybuilder.schema.CreateTable;
import com.datastax.oss.driver.api.querybuilder.select.Select;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
public class ProductRepository {
private static final String PRODUCT_TABLE_NAME = "product";
private static final String PRODUCT_BY_ID_TABLE_NAME = "product_by_id";
private static final String PRODUCT_BY_NAME_TABLE_NAME = "product_by_name";
private final CqlSession session;
public ProductRepository(CqlSession session) {
this.session = session;
}
public void createProductTable(String keyspace) {
CreateTable createTable = SchemaBuilder.createTable(PRODUCT_TABLE_NAME).ifNotExists()
.withPartitionKey("product_id", DataTypes.UUID)
.withClusteringColumn("variant_id", DataTypes.UUID)
.withColumn("product_name", DataTypes.TEXT)
.withColumn("description", DataTypes.TEXT)
.withColumn("price", DataTypes.FLOAT);
executeStatement(createTable.build(), keyspace);
}
public void createProductByIdTable(String keyspace) {
CreateTable createTable = SchemaBuilder.createTable(PRODUCT_BY_ID_TABLE_NAME).ifNotExists()
.withPartitionKey("product_id", DataTypes.UUID)
.withColumn("product_name", DataTypes.TEXT)
.withColumn("title", DataTypes.TEXT)
.withColumn("description", DataTypes.TEXT)
.withColumn("price", DataTypes.FLOAT);
executeStatement(createTable.build(), keyspace);
}
public void createProductTableByName(String keyspace) {
CreateTable createTable = SchemaBuilder.createTable(PRODUCT_BY_NAME_TABLE_NAME).ifNotExists()
.withPartitionKey("product_name", DataTypes.TEXT)
.withColumn("product_id", DataTypes.UUID)
.withColumn("description", DataTypes.TEXT)
.withColumn("price", DataTypes.FLOAT);
executeStatement(createTable.build(), keyspace);
}
/**
* Insert two variant Product into same table using a batch query.
*
* @param Product
*/
public void insertProductVariantBatch(Product productVariant1,Product productVariant2) {
UUID productId = UUID.randomUUID();
BoundStatement productBoundStatement1 = this.getProductVariantInsertStatement(productVariant1,productId);
BoundStatement productBoundStatement2 = this.getProductVariantInsertStatement(productVariant2,productId);
BatchStatement batch = BatchStatement.newInstance(DefaultBatchType.UNLOGGED,
productBoundStatement1,productBoundStatement2);
session.execute(batch);
}
/**
* Insert two same Product into related tables using a batch query.
*
* @param book
*/
public void insertProductBatch(Product product) {
UUID productId = UUID.randomUUID();
BoundStatement productBoundStatement1 = this.getProductInsertStatement(product,productId,PRODUCT_BY_ID_TABLE_NAME);
BoundStatement productBoundStatement2 = this.getProductInsertStatement(product,productId,PRODUCT_BY_NAME_TABLE_NAME);
BatchStatement batch = BatchStatement.newInstance(DefaultBatchType.LOGGED,
productBoundStatement1,productBoundStatement2);
session.execute(batch);
}
public List<Product> selectAllProduct(String keyspace) {
Select select = QueryBuilder.selectFrom(PRODUCT_TABLE_NAME).all();
ResultSet resultSet = executeStatement(select.build(), keyspace);
List<Product> result = new ArrayList<>();
resultSet.forEach(x -> result.add(new Product(x.getUuid("product_id"), x.getUuid("variant_id"),
x.getString("product_name"), x.getString("description"), x.getFloat("price"))));
return result;
}
public List<Product> selectAllProductByName(String keyspace) {
Select select = QueryBuilder.selectFrom(PRODUCT_BY_NAME_TABLE_NAME).all();
ResultSet resultSet = executeStatement(select.build(), keyspace);
List<Product> result = new ArrayList<>();
resultSet.forEach(x -> result.add(new Product(x.getUuid("product_id"),
x.getString("product_name"), x.getString("description"), x.getFloat("price"))));
return result;
}
public List<Product> selectAllProductById(String keyspace) {
Select select = QueryBuilder.selectFrom(PRODUCT_BY_ID_TABLE_NAME).all();
ResultSet resultSet = executeStatement(select.build(), keyspace);
List<Product> result = new ArrayList<>();
resultSet.forEach(x -> result.add(new Product(x.getUuid("product_id"),
x.getString("product_name"), x.getString("description"), x.getFloat("price"))));
return result;
}
/**
* Delete table.
*
* @param tableName the name of the table to delete.
*/
public void deleteTable(String tableName) {
StringBuilder sb = new StringBuilder("DROP TABLE IF EXISTS ").append(tableName);
final String query = sb.toString();
session.execute(query);
}
private ResultSet executeStatement(SimpleStatement statement, String keyspace) {
if (keyspace != null) {
statement.setKeyspace(CqlIdentifier.fromCql(keyspace));
}
return session.execute(statement);
}
private BoundStatement getProductVariantInsertStatement(Product product,UUID productId) {
String insertQuery = new StringBuilder("").append("INSERT INTO ").append(PRODUCT_TABLE_NAME)
.append("(product_id,variant_id,product_name,description,price) ").append("VALUES (").append(":product_id")
.append(", ").append(":variant_id").append(", ").append(":product_name").append(", ")
.append(":description").append(", ").append(":price").append(");").toString();
PreparedStatement preparedStatement = session.prepare(insertQuery);
return preparedStatement.bind(productId, UUID.randomUUID(),
product.getProductName(),
product.getDescription(),
product.getPrice());
}
private BoundStatement getProductInsertStatement(Product product,UUID productId,String productTableName) {
String cqlQuery1 = new StringBuilder("").append("INSERT INTO ").append(productTableName)
.append("(product_id,product_name,description,price) ").append("VALUES (").append(":product_id")
.append(", ").append(":product_name").append(", ").append(":description").append(", ")
.append(":price").append(");").toString();
PreparedStatement preparedStatement = session.prepare(cqlQuery1);
return preparedStatement.bind(productId,
product.getProductName(),
product.getDescription(),
product.getPrice());
}
}

View File

@ -0,0 +1,133 @@
package com.baeldung.cassandra.batch.epository;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.thrift.transport.TTransportException;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import com.baeldung.cassandra.batch.CassandraConnector;
import com.baeldung.cassandra.batch.domain.Product;
import com.baeldung.cassandra.batch.repository.KeyspaceRepository;
import com.baeldung.cassandra.batch.repository.ProductRepository;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
import com.datastax.oss.driver.api.core.cql.ResultSet;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ProductRepositoryIntegrationTest {
private KeyspaceRepository schemaRepository;
private ProductRepository productRepository;
private CqlSession session;
private final String KEYSPACE_NAME = "testBaeldungKeyspace";
private final String PRODUCT = "product";
@BeforeClass
public static void init() throws ConfigurationException, TTransportException, IOException, InterruptedException {
// Start an embedded Cassandra Server
EmbeddedCassandraServerHelper.startEmbeddedCassandra(20000L);
}
@Before
public void connect() {
CassandraConnector client = new CassandraConnector();
client.connect("127.0.0.1", 9142,"datacenter1");
session = client.getSession();
schemaRepository = new KeyspaceRepository(client.getSession());
schemaRepository.createKeyspace(KEYSPACE_NAME, 1);
schemaRepository.useKeyspace(KEYSPACE_NAME);
productRepository = new ProductRepository(client.getSession());
}
@Test
public void whenCreatingAProductTable_thenCreatedCorrectly() {
productRepository.deleteTable(KEYSPACE_NAME);
productRepository.createProductTable(KEYSPACE_NAME);
ResultSet result = session.execute("SELECT * FROM " + KEYSPACE_NAME + "." + PRODUCT + ";");
List<ColumnDefinition> colDef = new ArrayList<>();
result.getColumnDefinitions().forEach(columnDef -> colDef.add(columnDef));
List<String> columnNames = colDef.stream().map(ColumnDefinition::getName).map(CqlIdentifier::toString).collect(Collectors.toList());
assertEquals(columnNames.size(), 5);
assertTrue(columnNames.contains("product_id"));
assertTrue(columnNames.contains("variant_id"));
assertTrue(columnNames.contains("product_name"));
assertTrue(columnNames.contains("description"));
assertTrue(columnNames.contains("price"));
}
@Test
public void whenCreatingRelatedProductBatch_thenCreatedCorrectly() {
productRepository.deleteTable(KEYSPACE_NAME);
productRepository.createProductTableByName(KEYSPACE_NAME);
productRepository.createProductByIdTable(KEYSPACE_NAME);
Product product = getTestProduct();
productRepository.insertProductBatch(product);
List<Product> productByIdList = productRepository.selectAllProductById(KEYSPACE_NAME);
List<Product> productByNameList = productRepository.selectAllProductByName(KEYSPACE_NAME);
assertEquals(productByIdList.size(), 1);
assertEquals(productByNameList.size(), 1);
assertEquals(productByIdList.get(0).getProductName(), "Banana");
assertEquals(productByNameList.get(0).getProductName(), "Banana");
assertEquals(productByIdList.get(0).getDescription(), "Banana");
assertEquals(productByNameList.get(0).getDescription(), "Banana");
assertEquals(productByIdList.get(0).getPrice(), 12f,0f);
assertEquals(productByNameList.get(0).getPrice(), 12f,0f);
}
@Test
public void whenCreatingMultiVariantProductBatch_thenCreatedCorrectly() {
productRepository.deleteTable(KEYSPACE_NAME);
productRepository.createProductTable(KEYSPACE_NAME);
Product productV1 = getTestProduct();
Product productV2 = getTestProduct();
productRepository.insertProductVariantBatch(productV1, productV2);
List<Product> productList = productRepository.selectAllProduct(KEYSPACE_NAME);
assertEquals(productList.size(), 2);
assertEquals(productList.get(0).getProductName(), "Banana");
assertEquals(productList.get(1).getProductName(), "Banana");
assertEquals(productList.get(0).getDescription(), "Banana");
assertEquals(productList.get(1).getDescription(), "Banana");
assertEquals(productList.get(0).getPrice(), 12f,0f);
assertEquals(productList.get(1).getPrice(), 12f,0f);
}
@AfterClass
public static void cleanup() {
EmbeddedCassandraServerHelper.cleanEmbeddedCassandra();
}
private Product getTestProduct() {
Product product = new Product();
product.setProductName("Banana");
product.setDescription("Banana");
product.setPrice(12f);
return product;
}
}