From 49319e7eb0477c80b4a89e60fd1076e9863455aa Mon Sep 17 00:00:00 2001 From: Saikat Chakraborty <40471715+saikatcse03@users.noreply.github.com> Date: Thu, 20 Jan 2022 19:10:00 +0530 Subject: [PATCH] BAEL-4982 Batching In Cassandra (#11574) * Implemented cassandra batch query * Added netty version param * Reformatted correctly * Reformatted correctly * Reformatted correctly * Formatting fix resolved * Formatting fix resolved * Removed unused method * Refactored method for better readability * tab spaces corrected Co-authored-by: saikat chakraborty --- persistence-modules/java-cassandra/pom.xml | 8 +- .../baeldung/cassandra/batch/Application.java | 60 ++++++ .../cassandra/batch/CassandraConnector.java | 31 +++ .../cassandra/batch/domain/Product.java | 79 ++++++++ .../batch/repository/KeyspaceRepository.java | 27 +++ .../batch/repository/ProductRepository.java | 188 ++++++++++++++++++ .../ProductRepositoryIntegrationTest.java | 133 +++++++++++++ 7 files changed, 525 insertions(+), 1 deletion(-) create mode 100644 persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/Application.java create mode 100644 persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/CassandraConnector.java create mode 100644 persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/domain/Product.java create mode 100644 persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/repository/KeyspaceRepository.java create mode 100644 persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/repository/ProductRepository.java create mode 100644 persistence-modules/java-cassandra/src/test/java/com/baeldung/cassandra/batch/epository/ProductRepositoryIntegrationTest.java diff --git a/persistence-modules/java-cassandra/pom.xml b/persistence-modules/java-cassandra/pom.xml index ad80fc8a83..6df75edc56 100644 --- a/persistence-modules/java-cassandra/pom.xml +++ b/persistence-modules/java-cassandra/pom.xml @@ -43,6 +43,11 @@ java-driver-query-builder ${datastax-cassandra.version} + + io.netty + netty-transport + ${netty-transport-version} + @@ -50,6 +55,7 @@ 3.1.2 3.1.1.0 4.1.0 + 4.1.71.Final - \ No newline at end of file + diff --git a/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/Application.java b/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/Application.java new file mode 100644 index 0000000000..598b72338e --- /dev/null +++ b/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/Application.java @@ -0,0 +1,60 @@ +package com.baeldung.cassandra.batch; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.baeldung.cassandra.batch.domain.Product; +import com.baeldung.cassandra.batch.repository.KeyspaceRepository; +import com.baeldung.cassandra.batch.repository.ProductRepository; +import com.datastax.oss.driver.api.core.CqlSession; + +public class Application { + + private static final Logger LOG = LoggerFactory.getLogger(Application.class); + + public static void main(String[] args) { + new Application().run(); + } + + public void run() { + CassandraConnector connector = new CassandraConnector(); + connector.connect("127.0.0.1", 9042, "datacenter1"); + CqlSession session = connector.getSession(); + + KeyspaceRepository keyspaceRepository = new KeyspaceRepository(session); + + keyspaceRepository.createKeyspace("testKeyspace", 1); + keyspaceRepository.useKeyspace("testKeyspace"); + + ProductRepository productRepository = new ProductRepository(session); + + productRepository.createProductTable("testKeyspace"); + productRepository.createProductByIdTable("testKeyspace"); + productRepository.createProductByIdTable("testKeyspace"); + Product product = getProduct(); + productRepository.insertProductBatch(product); + + Product productV1 = getProduct(); + Product productV2 = getProduct(); + + productRepository.insertProductVariantBatch(productV1, productV2); + + + List products = productRepository.selectAllProduct("testKeyspace"); + products.forEach(x -> LOG.info(x.toString())); + connector.close(); + } + + private Product getProduct() { + Product product = new Product(); + product.setProductName("Banana"); + product.setDescription("Banana"); + product.setPrice(12f); + + return product; + } +} + + diff --git a/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/CassandraConnector.java b/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/CassandraConnector.java new file mode 100644 index 0000000000..02d63c62f6 --- /dev/null +++ b/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/CassandraConnector.java @@ -0,0 +1,31 @@ +package com.baeldung.cassandra.batch; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.CqlSessionBuilder; + +import java.net.InetSocketAddress; + +import org.apache.commons.lang3.StringUtils; + +public class CassandraConnector { + + private CqlSession session; + + public void connect(final String node, final Integer port, final String dataCenter) { + CqlSessionBuilder builder = CqlSession.builder(); + builder.addContactPoint(new InetSocketAddress(node, port)); + if (StringUtils.isNotBlank(dataCenter)) { + builder.withLocalDatacenter(dataCenter); + } + + session = builder.build(); + } + + public CqlSession getSession() { + return this.session; + } + + public void close() { + session.close(); + } +} diff --git a/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/domain/Product.java b/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/domain/Product.java new file mode 100644 index 0000000000..a787225fae --- /dev/null +++ b/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/domain/Product.java @@ -0,0 +1,79 @@ +package com.baeldung.cassandra.batch.domain; + +import java.util.UUID; + +public class Product { + + public Product() { + super(); + } + + public Product(UUID productId, UUID variantId, String productName, String description, float price) { + super(); + this.productId = productId; + this.variantId = variantId; + this.productName = productName; + this.description = description; + this.price = price; + } + + public Product(UUID productId, String productName, String description, float price) { + super(); + this.productId = productId; + this.productName = productName; + this.description = description; + this.price = price; + } + + private UUID productId; + private UUID variantId; + private String productName; + private String description; + private float price; + + public UUID getProductId() { + return productId; + } + + public void setProductId(UUID productId) { + this.productId = productId; + } + + public UUID getVariantId() { + return variantId; + } + + public void setVariantId(UUID variantId) { + this.variantId = variantId; + } + + public String getProductName() { + return productName; + } + + public void setProductName(String productName) { + this.productName = productName; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public float getPrice() { + return price; + } + + public void setPrice(float price) { + this.price = price; + } + + @Override + public String toString() { + return "Product [productId=" + productId + ", variantId=" + variantId + ", productName=" + productName + + ", description=" + description + ", price=" + price + "]"; + } +} diff --git a/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/repository/KeyspaceRepository.java b/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/repository/KeyspaceRepository.java new file mode 100644 index 0000000000..6d09af0bd3 --- /dev/null +++ b/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/repository/KeyspaceRepository.java @@ -0,0 +1,27 @@ +package com.baeldung.cassandra.batch.repository; + +import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.CqlSession; + +import com.datastax.oss.driver.api.querybuilder.SchemaBuilder; +import com.datastax.oss.driver.api.querybuilder.schema.CreateKeyspace; + +public class KeyspaceRepository { + private final CqlSession session; + + public KeyspaceRepository(CqlSession session) { + this.session = session; + } + + public void createKeyspace(String keyspaceName, int numberOfReplicas) { + CreateKeyspace createKeyspace = SchemaBuilder.createKeyspace(keyspaceName) + .ifNotExists() + .withSimpleStrategy(numberOfReplicas); + + session.execute(createKeyspace.build()); + } + + public void useKeyspace(String keyspace) { + session.execute("USE " + CqlIdentifier.fromCql(keyspace)); + } +} diff --git a/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/repository/ProductRepository.java b/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/repository/ProductRepository.java new file mode 100644 index 0000000000..106db133d1 --- /dev/null +++ b/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/repository/ProductRepository.java @@ -0,0 +1,188 @@ +package com.baeldung.cassandra.batch.repository; + +import com.baeldung.cassandra.batch.domain.Product; +import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.DefaultBatchType; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.BatchStatement; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.type.DataTypes; +import com.datastax.oss.driver.api.querybuilder.QueryBuilder; +import com.datastax.oss.driver.api.querybuilder.SchemaBuilder; +import com.datastax.oss.driver.api.querybuilder.schema.CreateTable; +import com.datastax.oss.driver.api.querybuilder.select.Select; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +public class ProductRepository { + + private static final String PRODUCT_TABLE_NAME = "product"; + private static final String PRODUCT_BY_ID_TABLE_NAME = "product_by_id"; + private static final String PRODUCT_BY_NAME_TABLE_NAME = "product_by_name"; + + private final CqlSession session; + + public ProductRepository(CqlSession session) { + this.session = session; + } + + public void createProductTable(String keyspace) { + CreateTable createTable = SchemaBuilder.createTable(PRODUCT_TABLE_NAME).ifNotExists() + .withPartitionKey("product_id", DataTypes.UUID) + .withClusteringColumn("variant_id", DataTypes.UUID) + .withColumn("product_name", DataTypes.TEXT) + .withColumn("description", DataTypes.TEXT) + .withColumn("price", DataTypes.FLOAT); + + executeStatement(createTable.build(), keyspace); + } + + public void createProductByIdTable(String keyspace) { + CreateTable createTable = SchemaBuilder.createTable(PRODUCT_BY_ID_TABLE_NAME).ifNotExists() + .withPartitionKey("product_id", DataTypes.UUID) + .withColumn("product_name", DataTypes.TEXT) + .withColumn("title", DataTypes.TEXT) + .withColumn("description", DataTypes.TEXT) + .withColumn("price", DataTypes.FLOAT); + + executeStatement(createTable.build(), keyspace); + } + + public void createProductTableByName(String keyspace) { + CreateTable createTable = SchemaBuilder.createTable(PRODUCT_BY_NAME_TABLE_NAME).ifNotExists() + .withPartitionKey("product_name", DataTypes.TEXT) + .withColumn("product_id", DataTypes.UUID) + .withColumn("description", DataTypes.TEXT) + .withColumn("price", DataTypes.FLOAT); + + executeStatement(createTable.build(), keyspace); + } + + /** + * Insert two variant Product into same table using a batch query. + * + * @param Product + */ + public void insertProductVariantBatch(Product productVariant1,Product productVariant2) { + UUID productId = UUID.randomUUID(); + BoundStatement productBoundStatement1 = this.getProductVariantInsertStatement(productVariant1,productId); + BoundStatement productBoundStatement2 = this.getProductVariantInsertStatement(productVariant2,productId); + + BatchStatement batch = BatchStatement.newInstance(DefaultBatchType.UNLOGGED, + productBoundStatement1,productBoundStatement2); + + session.execute(batch); + } + + + /** + * Insert two same Product into related tables using a batch query. + * + * @param book + */ + public void insertProductBatch(Product product) { + UUID productId = UUID.randomUUID(); + + BoundStatement productBoundStatement1 = this.getProductInsertStatement(product,productId,PRODUCT_BY_ID_TABLE_NAME); + BoundStatement productBoundStatement2 = this.getProductInsertStatement(product,productId,PRODUCT_BY_NAME_TABLE_NAME); + + BatchStatement batch = BatchStatement.newInstance(DefaultBatchType.LOGGED, + productBoundStatement1,productBoundStatement2); + + session.execute(batch); + } + + public List selectAllProduct(String keyspace) { + Select select = QueryBuilder.selectFrom(PRODUCT_TABLE_NAME).all(); + + ResultSet resultSet = executeStatement(select.build(), keyspace); + + List result = new ArrayList<>(); + + resultSet.forEach(x -> result.add(new Product(x.getUuid("product_id"), x.getUuid("variant_id"), + x.getString("product_name"), x.getString("description"), x.getFloat("price")))); + + return result; + } + + public List selectAllProductByName(String keyspace) { + Select select = QueryBuilder.selectFrom(PRODUCT_BY_NAME_TABLE_NAME).all(); + + ResultSet resultSet = executeStatement(select.build(), keyspace); + + List result = new ArrayList<>(); + + resultSet.forEach(x -> result.add(new Product(x.getUuid("product_id"), + x.getString("product_name"), x.getString("description"), x.getFloat("price")))); + + return result; + } + + public List selectAllProductById(String keyspace) { + Select select = QueryBuilder.selectFrom(PRODUCT_BY_ID_TABLE_NAME).all(); + + ResultSet resultSet = executeStatement(select.build(), keyspace); + + List result = new ArrayList<>(); + + resultSet.forEach(x -> result.add(new Product(x.getUuid("product_id"), + x.getString("product_name"), x.getString("description"), x.getFloat("price")))); + + return result; + } + + /** + * Delete table. + * + * @param tableName the name of the table to delete. + */ + public void deleteTable(String tableName) { + StringBuilder sb = new StringBuilder("DROP TABLE IF EXISTS ").append(tableName); + + final String query = sb.toString(); + session.execute(query); + } + + private ResultSet executeStatement(SimpleStatement statement, String keyspace) { + if (keyspace != null) { + statement.setKeyspace(CqlIdentifier.fromCql(keyspace)); + } + + return session.execute(statement); + } + + private BoundStatement getProductVariantInsertStatement(Product product,UUID productId) { + String insertQuery = new StringBuilder("").append("INSERT INTO ").append(PRODUCT_TABLE_NAME) + .append("(product_id,variant_id,product_name,description,price) ").append("VALUES (").append(":product_id") + .append(", ").append(":variant_id").append(", ").append(":product_name").append(", ") + .append(":description").append(", ").append(":price").append(");").toString(); + + PreparedStatement preparedStatement = session.prepare(insertQuery); + + return preparedStatement.bind(productId, UUID.randomUUID(), + product.getProductName(), + product.getDescription(), + product.getPrice()); + } + + private BoundStatement getProductInsertStatement(Product product,UUID productId,String productTableName) { + String cqlQuery1 = new StringBuilder("").append("INSERT INTO ").append(productTableName) + .append("(product_id,product_name,description,price) ").append("VALUES (").append(":product_id") + .append(", ").append(":product_name").append(", ").append(":description").append(", ") + .append(":price").append(");").toString(); + + PreparedStatement preparedStatement = session.prepare(cqlQuery1); + + return preparedStatement.bind(productId, + product.getProductName(), + product.getDescription(), + product.getPrice()); + } + + +} diff --git a/persistence-modules/java-cassandra/src/test/java/com/baeldung/cassandra/batch/epository/ProductRepositoryIntegrationTest.java b/persistence-modules/java-cassandra/src/test/java/com/baeldung/cassandra/batch/epository/ProductRepositoryIntegrationTest.java new file mode 100644 index 0000000000..55dc3dad9f --- /dev/null +++ b/persistence-modules/java-cassandra/src/test/java/com/baeldung/cassandra/batch/epository/ProductRepositoryIntegrationTest.java @@ -0,0 +1,133 @@ +package com.baeldung.cassandra.batch.epository; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.thrift.transport.TTransportException; +import org.cassandraunit.utils.EmbeddedCassandraServerHelper; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +import com.baeldung.cassandra.batch.CassandraConnector; +import com.baeldung.cassandra.batch.domain.Product; +import com.baeldung.cassandra.batch.repository.KeyspaceRepository; +import com.baeldung.cassandra.batch.repository.ProductRepository; +import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.ColumnDefinition; +import com.datastax.oss.driver.api.core.cql.ResultSet; + +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class ProductRepositoryIntegrationTest { + + private KeyspaceRepository schemaRepository; + + private ProductRepository productRepository; + + private CqlSession session; + + private final String KEYSPACE_NAME = "testBaeldungKeyspace"; + private final String PRODUCT = "product"; + + @BeforeClass + public static void init() throws ConfigurationException, TTransportException, IOException, InterruptedException { + // Start an embedded Cassandra Server + EmbeddedCassandraServerHelper.startEmbeddedCassandra(20000L); + } + + @Before + public void connect() { + CassandraConnector client = new CassandraConnector(); + client.connect("127.0.0.1", 9142,"datacenter1"); + session = client.getSession(); + schemaRepository = new KeyspaceRepository(client.getSession()); + schemaRepository.createKeyspace(KEYSPACE_NAME, 1); + schemaRepository.useKeyspace(KEYSPACE_NAME); + productRepository = new ProductRepository(client.getSession()); + } + + @Test + public void whenCreatingAProductTable_thenCreatedCorrectly() { + productRepository.deleteTable(KEYSPACE_NAME); + productRepository.createProductTable(KEYSPACE_NAME); + + ResultSet result = session.execute("SELECT * FROM " + KEYSPACE_NAME + "." + PRODUCT + ";"); + + List colDef = new ArrayList<>(); + + result.getColumnDefinitions().forEach(columnDef -> colDef.add(columnDef)); + List columnNames = colDef.stream().map(ColumnDefinition::getName).map(CqlIdentifier::toString).collect(Collectors.toList()); + assertEquals(columnNames.size(), 5); + assertTrue(columnNames.contains("product_id")); + assertTrue(columnNames.contains("variant_id")); + assertTrue(columnNames.contains("product_name")); + assertTrue(columnNames.contains("description")); + assertTrue(columnNames.contains("price")); + } + + @Test + public void whenCreatingRelatedProductBatch_thenCreatedCorrectly() { + productRepository.deleteTable(KEYSPACE_NAME); + productRepository.createProductTableByName(KEYSPACE_NAME); + productRepository.createProductByIdTable(KEYSPACE_NAME); + + Product product = getTestProduct(); + productRepository.insertProductBatch(product); + List productByIdList = productRepository.selectAllProductById(KEYSPACE_NAME); + List productByNameList = productRepository.selectAllProductByName(KEYSPACE_NAME); + + assertEquals(productByIdList.size(), 1); + assertEquals(productByNameList.size(), 1); + assertEquals(productByIdList.get(0).getProductName(), "Banana"); + assertEquals(productByNameList.get(0).getProductName(), "Banana"); + assertEquals(productByIdList.get(0).getDescription(), "Banana"); + assertEquals(productByNameList.get(0).getDescription(), "Banana"); + assertEquals(productByIdList.get(0).getPrice(), 12f,0f); + assertEquals(productByNameList.get(0).getPrice(), 12f,0f); + } + + @Test + public void whenCreatingMultiVariantProductBatch_thenCreatedCorrectly() { + productRepository.deleteTable(KEYSPACE_NAME); + productRepository.createProductTable(KEYSPACE_NAME); + + Product productV1 = getTestProduct(); + Product productV2 = getTestProduct(); + productRepository.insertProductVariantBatch(productV1, productV2); + List productList = productRepository.selectAllProduct(KEYSPACE_NAME); + + assertEquals(productList.size(), 2); + assertEquals(productList.get(0).getProductName(), "Banana"); + assertEquals(productList.get(1).getProductName(), "Banana"); + assertEquals(productList.get(0).getDescription(), "Banana"); + assertEquals(productList.get(1).getDescription(), "Banana"); + assertEquals(productList.get(0).getPrice(), 12f,0f); + assertEquals(productList.get(1).getPrice(), 12f,0f); + } + + + + @AfterClass + public static void cleanup() { + EmbeddedCassandraServerHelper.cleanEmbeddedCassandra(); + } + + private Product getTestProduct() { + Product product = new Product(); + product.setProductName("Banana"); + product.setDescription("Banana"); + product.setPrice(12f); + + return product; + } +}