diff --git a/examples/cassandra/README.md b/examples/cassandra/README.md new file mode 100644 index 00000000000..addc35d9ba7 --- /dev/null +++ b/examples/cassandra/README.md @@ -0,0 +1,32 @@ +## Introduction +Druid can use Cassandra as a deep storage mechanism. Segments and their metadata are stored in Cassandra in two tables: +`index_storage` and `descriptor_storage`. Underneath the hood, the Cassandra integration leverages Astyanax. The +index storage table is a [Chunked Object](https://github.com/Netflix/astyanax/wiki/Chunked-Object-Store) repository. It contains +compressed segments for distribution to compute nodes. Since segments can be large, the Chunked Object storage allows the integration to multi-thread +the write to Cassandra, and spreads the data across all the nodes in a cluster. The descriptor storage table is a normal C* table that +stores the segment metadatak. + +## Schema +Below are the create statements for each: + + + + CREATE TABLE index_storage ( key text, chunk text, value blob, PRIMARY KEY (key, chunk)) WITH COMPACT STORAGE; + + CREATE TABLE descriptor_storage ( key varchar, lastModified timestamp, descriptor varchar, PRIMARY KEY (key) ) WITH COMPACT STORAGE; + + +## Getting Started +First create the schema above. (I use a new keyspace called `druid`) + +Then, add the following properties to your properties file to enable a Cassandra +backend. + + druid.pusher.cassandra=true + druid.pusher.cassandra.host=localhost:9160 + druid.pusher.cassandra.keyspace=druid + +Use the `druid-development@googlegroups.com` mailing list if you have questions, +or feel free to reach out directly: `bone@alumni.brown.edu`. + + diff --git a/examples/cassandra/client.sh b/examples/cassandra/client.sh new file mode 100755 index 00000000000..5ecb6d6cede --- /dev/null +++ b/examples/cassandra/client.sh @@ -0,0 +1 @@ +curl -sX POST "http://localhost:9090/druid/v2/?pretty=true" -H 'content-type: application/json' -d @query diff --git a/examples/cassandra/query b/examples/cassandra/query new file mode 100644 index 00000000000..09aa4d3ce5d --- /dev/null +++ b/examples/cassandra/query @@ -0,0 +1,19 @@ +{ + "queryType": "groupBy", + "dataSource": "randSeq", + "granularity": "all", + "dimensions": [], + "aggregations":[ + { "type": "count", "name": "rows"}, + { "type": "doubleSum", "fieldName": "events", "name": "e"}, + { "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"} + ], + "postAggregations":[ + { "type":"arithmetic", + "name":"avg_random", + "fn":"/", + "fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"}, + {"type":"fieldAccess","name":"rows","fieldName":"rows"} ]} + ], + "intervals":["2012-10-01T00:00/2020-01-01T00"] + } diff --git a/examples/cassandra/schema/druid_schema.cql b/examples/cassandra/schema/druid_schema.cql new file mode 100644 index 00000000000..a38106d9715 --- /dev/null +++ b/examples/cassandra/schema/druid_schema.cql @@ -0,0 +1,2 @@ +CREATE TABLE index_storage ( key text, chunk text, value blob, PRIMARY KEY (key, chunk)) WITH COMPACT STORAGE; +CREATE TABLE descriptor_storage ( key varchar, lastModified timestamp, descriptor varchar, PRIMARY KEY (key) ) WITH COMPACT STORAGE; diff --git a/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java b/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java index 1dc60cd0e31..4cb267f571f 100644 --- a/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java +++ b/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java @@ -50,7 +50,7 @@ public class RealtimeStandaloneMain rn.setDataSegmentPusher(new NoopDataSegmentPusher()); rn.setServerView(new NoopServerView()); rn.setInventoryView(new NoopInventoryView()); - + Runtime.getRuntime().addShutdownHook( new Thread( new Runnable() diff --git a/server/pom.xml b/server/pom.xml index 766ab31d778..244124ed740 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -194,6 +194,11 @@ java-xmlbuilder true + + com.netflix.astyanax + astyanax + 1.0.1 + diff --git a/server/src/main/java/com/metamx/druid/http/ComputeNode.java b/server/src/main/java/com/metamx/druid/http/ComputeNode.java index 800b16856c6..f538b69b4db 100644 --- a/server/src/main/java/com/metamx/druid/http/ComputeNode.java +++ b/server/src/main/java/com/metamx/druid/http/ComputeNode.java @@ -141,17 +141,9 @@ public class ComputeNode extends BaseServerNode private void initializeSegmentLoader() { if (segmentLoader == null) { - final Properties props = getProps(); try { - final RestS3Service s3Client = new RestS3Service( - new AWSCredentials( - PropUtils.getProperty(props, "com.metamx.aws.accessKey"), - PropUtils.getProperty(props, "com.metamx.aws.secretKey") - ) - ); - setSegmentLoader( - ServerInit.makeDefaultQueryableLoader(s3Client, getConfigFactory().build(SegmentLoaderConfig.class)) + ServerInit.makeDefaultQueryableLoader(getConfigFactory(), getProps()) ); } catch (S3ServiceException e) { diff --git a/server/src/main/java/com/metamx/druid/http/InfoResource.java b/server/src/main/java/com/metamx/druid/http/InfoResource.java index ff5b5ae58d7..18e9e68e4c4 100644 --- a/server/src/main/java/com/metamx/druid/http/InfoResource.java +++ b/server/src/main/java/com/metamx/druid/http/InfoResource.java @@ -72,6 +72,7 @@ public class InfoResource InventoryView serverInventoryView, DatabaseSegmentManager databaseSegmentManager, DatabaseRuleManager databaseRuleManager, + @Nullable IndexingServiceClient indexingServiceClient ) { diff --git a/server/src/main/java/com/metamx/druid/initialization/ServerInit.java b/server/src/main/java/com/metamx/druid/initialization/ServerInit.java index 657ce0693d6..0ac5e67a4de 100644 --- a/server/src/main/java/com/metamx/druid/initialization/ServerInit.java +++ b/server/src/main/java/com/metamx/druid/initialization/ServerInit.java @@ -19,6 +19,18 @@ package com.metamx.druid.initialization; +import java.lang.reflect.InvocationTargetException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.conf.Configuration; +import org.jets3t.service.S3ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.security.AWSCredentials; +import org.skife.config.ConfigurationObjectFactory; + import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; import com.google.common.base.Throwables; @@ -45,6 +57,9 @@ import com.metamx.druid.loading.S3DataSegmentPusherConfig; import com.metamx.druid.loading.SegmentLoader; import com.metamx.druid.loading.SegmentLoaderConfig; import com.metamx.druid.loading.SingleSegmentLoader; +import com.metamx.druid.loading.cassandra.CassandraDataSegmentConfig; +import com.metamx.druid.loading.cassandra.CassandraDataSegmentPuller; +import com.metamx.druid.loading.cassandra.CassandraDataSegmentPusher; import com.metamx.druid.query.QueryRunnerFactory; import com.metamx.druid.query.group.GroupByQuery; import com.metamx.druid.query.group.GroupByQueryEngine; @@ -60,17 +75,6 @@ import com.metamx.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory; import com.metamx.druid.query.timeseries.TimeseriesQuery; import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerFactory; import com.metamx.druid.utils.PropUtils; -import org.apache.hadoop.conf.Configuration; -import org.jets3t.service.S3ServiceException; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.security.AWSCredentials; -import org.skife.config.ConfigurationObjectFactory; - -import java.lang.reflect.InvocationTargetException; -import java.nio.ByteBuffer; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicLong; /** */ @@ -79,26 +83,32 @@ public class ServerInit private static Logger log = new Logger(ServerInit.class); public static SegmentLoader makeDefaultQueryableLoader( - RestS3Service s3Client, - SegmentLoaderConfig config - ) + final ConfigurationObjectFactory configFactory, + final Properties props + ) throws S3ServiceException { + SegmentLoaderConfig config = configFactory.build(SegmentLoaderConfig.class); DelegatingSegmentLoader delegateLoader = new DelegatingSegmentLoader(); - - final S3DataSegmentPuller segmentGetter = new S3DataSegmentPuller(s3Client); final QueryableIndexFactory factory = new MMappedQueryableIndexFactory(); - SingleSegmentLoader s3segmentLoader = new SingleSegmentLoader(segmentGetter, factory, config); - + final RestS3Service s3Client = new RestS3Service( + new AWSCredentials( + props.getProperty("com.metamx.aws.accessKey", ""), + props.getProperty("com.metamx.aws.secretKey", "") + ) + ); + final S3DataSegmentPuller segmentGetter = new S3DataSegmentPuller(s3Client); + final SingleSegmentLoader s3segmentLoader = new SingleSegmentLoader(segmentGetter, factory, config); + delegateLoader.setLoaderTypes( ImmutableMap.builder() - .put("s3", s3segmentLoader) - .put("s3_zip", s3segmentLoader) - .put("local", new SingleSegmentLoader(new LocalDataSegmentPuller(), factory, config)) - .put("hdfs", new SingleSegmentLoader(new HdfsDataSegmentPuller(new Configuration()), factory, config)) - .build() + .put("local", new SingleSegmentLoader(new LocalDataSegmentPuller(), factory, config)) + .put("hdfs", new SingleSegmentLoader(new HdfsDataSegmentPuller(new Configuration()), factory, config)) + .put("s3", s3segmentLoader) + .put("s3_zip", s3segmentLoader) + .put("c*",new SingleSegmentLoader(new CassandraDataSegmentPuller(configFactory.build(CassandraDataSegmentConfig.class)), factory, config)) + .build() ); - return delegateLoader; } @@ -171,6 +181,11 @@ public class ServerInit if (Boolean.parseBoolean(props.getProperty("druid.pusher.local", "false"))) { return new LocalDataSegmentPusher(configFactory.build(LocalDataSegmentPusherConfig.class), jsonMapper); } + else if (Boolean.parseBoolean(props.getProperty("druid.pusher.cassandra", "false"))) { + final CassandraDataSegmentConfig config = configFactory.build(CassandraDataSegmentConfig.class); + + return new CassandraDataSegmentPusher(config, jsonMapper); + } else if (Boolean.parseBoolean(props.getProperty("druid.pusher.hdfs", "false"))) { final HdfsDataSegmentPusherConfig config = configFactory.build(HdfsDataSegmentPusherConfig.class); diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentConfig.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentConfig.java new file mode 100644 index 00000000000..d2b9e2cbe83 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentConfig.java @@ -0,0 +1,36 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.loading.cassandra; + +import org.skife.config.Config; + +/** + * Cassandra Config + * + * @author boneill42 + */ +public abstract class CassandraDataSegmentConfig +{ + @Config("druid.pusher.cassandra.host") + public abstract String getHost(); + + @Config("druid.pusher.cassandra.keyspace") + public abstract String getKeyspace(); +} diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java new file mode 100644 index 00000000000..87e6105f161 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java @@ -0,0 +1,119 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.loading.cassandra; + +import java.io.File; +import java.io.OutputStream; + +import org.apache.commons.io.FileUtils; + +import com.google.common.io.Files; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.loading.DataSegmentPuller; +import com.metamx.druid.loading.SegmentLoadingException; +import com.metamx.druid.utils.CompressionUtils; +import com.netflix.astyanax.connectionpool.OperationResult; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; +import com.netflix.astyanax.model.ColumnList; +import com.netflix.astyanax.recipes.storage.ChunkedStorage; +import com.netflix.astyanax.recipes.storage.ObjectMetadata; + +/** + * Cassandra Segment Puller + * + * @author boneill42 + */ +public class CassandraDataSegmentPuller extends CassandraStorage implements DataSegmentPuller +{ + private static final Logger log = new Logger(CassandraDataSegmentPuller.class); + private static final int CONCURRENCY = 10; + private static final int BATCH_SIZE = 10; + + public CassandraDataSegmentPuller(CassandraDataSegmentConfig config) + { + super(config); + } + + @Override + public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException + { + String key = (String) segment.getLoadSpec().get("key"); + log.info("Pulling index from C* at path[%s] to outDir[%s]", key, outDir); + + if (!outDir.exists()) + { + outDir.mkdirs(); + } + + if (!outDir.isDirectory()) + { + throw new ISE("outDir[%s] must be a directory.", outDir); + } + + long startTime = System.currentTimeMillis(); + ObjectMetadata meta = null; + final File outFile = new File(outDir, "index.zip"); + try + { + try + { + log.info("Writing to [%s]", outFile.getAbsolutePath()); + OutputStream os = Files.newOutputStreamSupplier(outFile).getOutput(); + meta = ChunkedStorage + .newReader(indexStorage, key, os) + .withBatchSize(BATCH_SIZE) + .withConcurrencyLevel(CONCURRENCY) + .call(); + os.close(); + CompressionUtils.unzip(outFile, outDir); + } catch (Exception e) + { + FileUtils.deleteDirectory(outDir); + } + } catch (Exception e) + { + throw new SegmentLoadingException(e, e.getMessage()); + } + log.info("Pull of file[%s] completed in %,d millis (%s bytes)", key, System.currentTimeMillis() - startTime, + meta.getObjectSize()); + } + + @Override + public long getLastModified(DataSegment segment) throws SegmentLoadingException + { + String key = (String) segment.getLoadSpec().get("key"); + OperationResult> result; + try + { + result = this.keyspace.prepareQuery(descriptorStorage) + .getKey(key) + .execute(); + ColumnList children = result.getResult(); + long lastModified = children.getColumnByName("lastmodified").getLongValue(); + log.info("Read lastModified for [%s] as [%d]", key, lastModified); + return lastModified; + } catch (ConnectionException e) + { + throw new SegmentLoadingException(e, e.getMessage()); + } + } +} diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java new file mode 100644 index 00000000000..57bc72b9124 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java @@ -0,0 +1,82 @@ +package com.metamx.druid.loading.cassandra; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableMap; +import com.metamx.common.logger.Logger; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.index.v1.IndexIO; +import com.metamx.druid.loading.DataSegmentPusher; +import com.metamx.druid.loading.DataSegmentPusherUtil; +import com.metamx.druid.utils.CompressionUtils; +import com.netflix.astyanax.MutationBatch; +import com.netflix.astyanax.recipes.storage.ChunkedStorage; + +/** + * Cassandra Segment Pusher + * + * @author boneill42 + */ +public class CassandraDataSegmentPusher extends CassandraStorage implements DataSegmentPusher +{ + private static final Logger log = new Logger(CassandraDataSegmentPusher.class); + private static final int CONCURRENCY = 10; + private static final Joiner JOINER = Joiner.on("/").skipNulls(); + private final ObjectMapper jsonMapper; + + public CassandraDataSegmentPusher( + CassandraDataSegmentConfig config, + ObjectMapper jsonMapper) + { + super(config); + this.jsonMapper=jsonMapper; + } + + @Override + public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException + { + log.info("Writing [%s] to C*", indexFilesDir); + String key = JOINER.join( + config.getKeyspace().isEmpty() ? null : config.getKeyspace(), + DataSegmentPusherUtil.getStorageDir(segment) + ); + + // Create index + final File compressedIndexFile = File.createTempFile("druid", "index.zip"); + long indexSize = CompressionUtils.zip(indexFilesDir, compressedIndexFile); + log.info("Wrote compressed file [%s] to [%s]", compressedIndexFile.getAbsolutePath(), key); + + int version = IndexIO.getVersionFromDir(indexFilesDir); + + try + { + long start = System.currentTimeMillis(); + ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile)) + .withConcurrencyLevel(CONCURRENCY).call(); + byte[] json = jsonMapper.writeValueAsBytes(segment); + MutationBatch mutation = this.keyspace.prepareMutationBatch(); + mutation.withRow(descriptorStorage, key) + .putColumn("lastmodified", System.currentTimeMillis(), null) + .putColumn("descriptor", json, null); + mutation.execute(); + log.info("Wrote index to C* in [%s] ms", System.currentTimeMillis() - start); + } catch (Exception e) + { + throw new IOException(e); + } + + segment = segment.withSize(indexSize) + .withLoadSpec( + ImmutableMap. of("type", "c*", "key", key) + ) + .withBinaryVersion(version); + + log.info("Deleting zipped index File[%s]", compressedIndexFile); + compressedIndexFile.delete(); + return segment; + } +} diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraStorage.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraStorage.java new file mode 100644 index 00000000000..e9d04609135 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraStorage.java @@ -0,0 +1,72 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.loading.cassandra; + +import com.netflix.astyanax.AstyanaxContext; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.connectionpool.NodeDiscoveryType; +import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl; +import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor; +import com.netflix.astyanax.impl.AstyanaxConfigurationImpl; +import com.netflix.astyanax.model.ColumnFamily; +import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider; +import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider; +import com.netflix.astyanax.serializers.StringSerializer; +import com.netflix.astyanax.thrift.ThriftFamilyFactory; + +/** + * Superclass for accessing Cassandra Storage. + * + * This is the schema used to support the index and descriptor storage: + * + * CREATE TABLE index_storage ( key text, chunk text, value blob, PRIMARY KEY (key, chunk)) WITH COMPACT STORAGE; + * CREATE TABLE descriptor_storage ( key varchar, lastModified timestamp, descriptor varchar, PRIMARY KEY (key) ) WITH COMPACT STORAGE; + */ +public class CassandraStorage +{ + private static final String CLUSTER_NAME = "druid_cassandra_cluster"; + private static final String INDEX_TABLE_NAME = "index_storage"; + private static final String DESCRIPTOR_TABLE_NAME = "descriptor_storage"; + + private AstyanaxContext astyanaxContext; + final Keyspace keyspace; + final ChunkedStorageProvider indexStorage; + final ColumnFamily descriptorStorage; + final CassandraDataSegmentConfig config; + + public CassandraStorage(CassandraDataSegmentConfig config) + { + this.astyanaxContext = new AstyanaxContext.Builder() + .forCluster(CLUSTER_NAME) + .forKeyspace(config.getKeyspace()) + .withAstyanaxConfiguration(new AstyanaxConfigurationImpl().setDiscoveryType(NodeDiscoveryType.NONE)) + .withConnectionPoolConfiguration( + new ConnectionPoolConfigurationImpl("MyConnectionPool").setMaxConnsPerHost(10) + .setSeeds(config.getHost())).withConnectionPoolMonitor(new CountingConnectionPoolMonitor()) + .buildKeyspace(ThriftFamilyFactory.getInstance()); + this.astyanaxContext.start(); + this.keyspace = this.astyanaxContext.getEntity(); + this.config = config; + indexStorage = new CassandraChunkedStorageProvider(keyspace, INDEX_TABLE_NAME); + + descriptorStorage = new ColumnFamily(DESCRIPTOR_TABLE_NAME, + StringSerializer.get(), StringSerializer.get()); + } +}