Merge pull request #140 from boneill42/cassandra-segments

Initial implementation of Cassandra Data Segment Pusher and Puller
This commit is contained in:
fjy 2013-05-17 09:56:44 -07:00
commit b0a58af05d
13 changed files with 410 additions and 34 deletions

View File

@ -0,0 +1,32 @@
## Introduction
Druid can use Cassandra as a deep storage mechanism. Segments and their metadata are stored in Cassandra in two tables:
`index_storage` and `descriptor_storage`. Underneath the hood, the Cassandra integration leverages Astyanax. The
index storage table is a [Chunked Object](https://github.com/Netflix/astyanax/wiki/Chunked-Object-Store) repository. It contains
compressed segments for distribution to compute nodes. Since segments can be large, the Chunked Object storage allows the integration to multi-thread
the write to Cassandra, and spreads the data across all the nodes in a cluster. The descriptor storage table is a normal C* table that
stores the segment metadatak.
## Schema
Below are the create statements for each:
CREATE TABLE index_storage ( key text, chunk text, value blob, PRIMARY KEY (key, chunk)) WITH COMPACT STORAGE;
CREATE TABLE descriptor_storage ( key varchar, lastModified timestamp, descriptor varchar, PRIMARY KEY (key) ) WITH COMPACT STORAGE;
## Getting Started
First create the schema above. (I use a new keyspace called `druid`)
Then, add the following properties to your properties file to enable a Cassandra
backend.
druid.pusher.cassandra=true
druid.pusher.cassandra.host=localhost:9160
druid.pusher.cassandra.keyspace=druid
Use the `druid-development@googlegroups.com` mailing list if you have questions,
or feel free to reach out directly: `bone@alumni.brown.edu`.

1
examples/cassandra/client.sh Executable file
View File

@ -0,0 +1 @@
curl -sX POST "http://localhost:9090/druid/v2/?pretty=true" -H 'content-type: application/json' -d @query

19
examples/cassandra/query Normal file
View File

@ -0,0 +1,19 @@
{
"queryType": "groupBy",
"dataSource": "randSeq",
"granularity": "all",
"dimensions": [],
"aggregations":[
{ "type": "count", "name": "rows"},
{ "type": "doubleSum", "fieldName": "events", "name": "e"},
{ "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
],
"postAggregations":[
{ "type":"arithmetic",
"name":"avg_random",
"fn":"/",
"fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
{"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
],
"intervals":["2012-10-01T00:00/2020-01-01T00"]
}

View File

@ -0,0 +1,2 @@
CREATE TABLE index_storage ( key text, chunk text, value blob, PRIMARY KEY (key, chunk)) WITH COMPACT STORAGE;
CREATE TABLE descriptor_storage ( key varchar, lastModified timestamp, descriptor varchar, PRIMARY KEY (key) ) WITH COMPACT STORAGE;

View File

@ -194,6 +194,11 @@
<artifactId>java-xmlbuilder</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.netflix.astyanax</groupId>
<artifactId>astyanax</artifactId>
<version>1.0.1</version>
</dependency>
<!-- Dependencies required for jets3t -->
<!-- Tests -->

View File

@ -141,17 +141,9 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
private void initializeSegmentLoader()
{
if (segmentLoader == null) {
final Properties props = getProps();
try {
final RestS3Service s3Client = new RestS3Service(
new AWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
);
setSegmentLoader(
ServerInit.makeDefaultQueryableLoader(s3Client, getConfigFactory().build(SegmentLoaderConfig.class))
ServerInit.makeDefaultQueryableLoader(getConfigFactory(), getProps())
);
}
catch (S3ServiceException e) {

View File

@ -72,6 +72,7 @@ public class InfoResource
InventoryView serverInventoryView,
DatabaseSegmentManager databaseSegmentManager,
DatabaseRuleManager databaseRuleManager,
@Nullable
IndexingServiceClient indexingServiceClient
)
{

View File

@ -19,6 +19,18 @@
package com.metamx.druid.initialization;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.skife.config.ConfigurationObjectFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
@ -45,6 +57,9 @@ import com.metamx.druid.loading.S3DataSegmentPusherConfig;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.loading.SingleSegmentLoader;
import com.metamx.druid.loading.cassandra.CassandraDataSegmentConfig;
import com.metamx.druid.loading.cassandra.CassandraDataSegmentPuller;
import com.metamx.druid.loading.cassandra.CassandraDataSegmentPusher;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.group.GroupByQuery;
import com.metamx.druid.query.group.GroupByQueryEngine;
@ -60,17 +75,6 @@ import com.metamx.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory;
import com.metamx.druid.query.timeseries.TimeseriesQuery;
import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import com.metamx.druid.utils.PropUtils;
import org.apache.hadoop.conf.Configuration;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.skife.config.ConfigurationObjectFactory;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
@ -79,26 +83,32 @@ public class ServerInit
private static Logger log = new Logger(ServerInit.class);
public static SegmentLoader makeDefaultQueryableLoader(
RestS3Service s3Client,
SegmentLoaderConfig config
)
final ConfigurationObjectFactory configFactory,
final Properties props
) throws S3ServiceException
{
SegmentLoaderConfig config = configFactory.build(SegmentLoaderConfig.class);
DelegatingSegmentLoader delegateLoader = new DelegatingSegmentLoader();
final S3DataSegmentPuller segmentGetter = new S3DataSegmentPuller(s3Client);
final QueryableIndexFactory factory = new MMappedQueryableIndexFactory();
SingleSegmentLoader s3segmentLoader = new SingleSegmentLoader(segmentGetter, factory, config);
final RestS3Service s3Client = new RestS3Service(
new AWSCredentials(
props.getProperty("com.metamx.aws.accessKey", ""),
props.getProperty("com.metamx.aws.secretKey", "")
)
);
final S3DataSegmentPuller segmentGetter = new S3DataSegmentPuller(s3Client);
final SingleSegmentLoader s3segmentLoader = new SingleSegmentLoader(segmentGetter, factory, config);
delegateLoader.setLoaderTypes(
ImmutableMap.<String, SegmentLoader>builder()
.put("s3", s3segmentLoader)
.put("s3_zip", s3segmentLoader)
.put("local", new SingleSegmentLoader(new LocalDataSegmentPuller(), factory, config))
.put("hdfs", new SingleSegmentLoader(new HdfsDataSegmentPuller(new Configuration()), factory, config))
.put("s3", s3segmentLoader)
.put("s3_zip", s3segmentLoader)
.put("c*",new SingleSegmentLoader(new CassandraDataSegmentPuller(configFactory.build(CassandraDataSegmentConfig.class)), factory, config))
.build()
);
return delegateLoader;
}
@ -171,6 +181,11 @@ public class ServerInit
if (Boolean.parseBoolean(props.getProperty("druid.pusher.local", "false"))) {
return new LocalDataSegmentPusher(configFactory.build(LocalDataSegmentPusherConfig.class), jsonMapper);
}
else if (Boolean.parseBoolean(props.getProperty("druid.pusher.cassandra", "false"))) {
final CassandraDataSegmentConfig config = configFactory.build(CassandraDataSegmentConfig.class);
return new CassandraDataSegmentPusher(config, jsonMapper);
}
else if (Boolean.parseBoolean(props.getProperty("druid.pusher.hdfs", "false"))) {
final HdfsDataSegmentPusherConfig config = configFactory.build(HdfsDataSegmentPusherConfig.class);

View File

@ -0,0 +1,36 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading.cassandra;
import org.skife.config.Config;
/**
* Cassandra Config
*
* @author boneill42
*/
public abstract class CassandraDataSegmentConfig
{
@Config("druid.pusher.cassandra.host")
public abstract String getHost();
@Config("druid.pusher.cassandra.keyspace")
public abstract String getKeyspace();
}

View File

@ -0,0 +1,119 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading.cassandra;
import java.io.File;
import java.io.OutputStream;
import org.apache.commons.io.FileUtils;
import com.google.common.io.Files;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.loading.DataSegmentPuller;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.utils.CompressionUtils;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.recipes.storage.ChunkedStorage;
import com.netflix.astyanax.recipes.storage.ObjectMetadata;
/**
* Cassandra Segment Puller
*
* @author boneill42
*/
public class CassandraDataSegmentPuller extends CassandraStorage implements DataSegmentPuller
{
private static final Logger log = new Logger(CassandraDataSegmentPuller.class);
private static final int CONCURRENCY = 10;
private static final int BATCH_SIZE = 10;
public CassandraDataSegmentPuller(CassandraDataSegmentConfig config)
{
super(config);
}
@Override
public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException
{
String key = (String) segment.getLoadSpec().get("key");
log.info("Pulling index from C* at path[%s] to outDir[%s]", key, outDir);
if (!outDir.exists())
{
outDir.mkdirs();
}
if (!outDir.isDirectory())
{
throw new ISE("outDir[%s] must be a directory.", outDir);
}
long startTime = System.currentTimeMillis();
ObjectMetadata meta = null;
final File outFile = new File(outDir, "index.zip");
try
{
try
{
log.info("Writing to [%s]", outFile.getAbsolutePath());
OutputStream os = Files.newOutputStreamSupplier(outFile).getOutput();
meta = ChunkedStorage
.newReader(indexStorage, key, os)
.withBatchSize(BATCH_SIZE)
.withConcurrencyLevel(CONCURRENCY)
.call();
os.close();
CompressionUtils.unzip(outFile, outDir);
} catch (Exception e)
{
FileUtils.deleteDirectory(outDir);
}
} catch (Exception e)
{
throw new SegmentLoadingException(e, e.getMessage());
}
log.info("Pull of file[%s] completed in %,d millis (%s bytes)", key, System.currentTimeMillis() - startTime,
meta.getObjectSize());
}
@Override
public long getLastModified(DataSegment segment) throws SegmentLoadingException
{
String key = (String) segment.getLoadSpec().get("key");
OperationResult<ColumnList<String>> result;
try
{
result = this.keyspace.prepareQuery(descriptorStorage)
.getKey(key)
.execute();
ColumnList<String> children = result.getResult();
long lastModified = children.getColumnByName("lastmodified").getLongValue();
log.info("Read lastModified for [%s] as [%d]", key, lastModified);
return lastModified;
} catch (ConnectionException e)
{
throw new SegmentLoadingException(e, e.getMessage());
}
}
}

View File

@ -0,0 +1,82 @@
package com.metamx.druid.loading.cassandra;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.DataSegmentPusherUtil;
import com.metamx.druid.utils.CompressionUtils;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.recipes.storage.ChunkedStorage;
/**
* Cassandra Segment Pusher
*
* @author boneill42
*/
public class CassandraDataSegmentPusher extends CassandraStorage implements DataSegmentPusher
{
private static final Logger log = new Logger(CassandraDataSegmentPusher.class);
private static final int CONCURRENCY = 10;
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private final ObjectMapper jsonMapper;
public CassandraDataSegmentPusher(
CassandraDataSegmentConfig config,
ObjectMapper jsonMapper)
{
super(config);
this.jsonMapper=jsonMapper;
}
@Override
public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException
{
log.info("Writing [%s] to C*", indexFilesDir);
String key = JOINER.join(
config.getKeyspace().isEmpty() ? null : config.getKeyspace(),
DataSegmentPusherUtil.getStorageDir(segment)
);
// Create index
final File compressedIndexFile = File.createTempFile("druid", "index.zip");
long indexSize = CompressionUtils.zip(indexFilesDir, compressedIndexFile);
log.info("Wrote compressed file [%s] to [%s]", compressedIndexFile.getAbsolutePath(), key);
int version = IndexIO.getVersionFromDir(indexFilesDir);
try
{
long start = System.currentTimeMillis();
ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile))
.withConcurrencyLevel(CONCURRENCY).call();
byte[] json = jsonMapper.writeValueAsBytes(segment);
MutationBatch mutation = this.keyspace.prepareMutationBatch();
mutation.withRow(descriptorStorage, key)
.putColumn("lastmodified", System.currentTimeMillis(), null)
.putColumn("descriptor", json, null);
mutation.execute();
log.info("Wrote index to C* in [%s] ms", System.currentTimeMillis() - start);
} catch (Exception e)
{
throw new IOException(e);
}
segment = segment.withSize(indexSize)
.withLoadSpec(
ImmutableMap.<String, Object> of("type", "c*", "key", key)
)
.withBinaryVersion(version);
log.info("Deleting zipped index File[%s]", compressedIndexFile);
compressedIndexFile.delete();
return segment;
}
}

View File

@ -0,0 +1,72 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading.cassandra;
import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider;
import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider;
import com.netflix.astyanax.serializers.StringSerializer;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;
/**
* Superclass for accessing Cassandra Storage.
*
* This is the schema used to support the index and descriptor storage:
*
* CREATE TABLE index_storage ( key text, chunk text, value blob, PRIMARY KEY (key, chunk)) WITH COMPACT STORAGE;
* CREATE TABLE descriptor_storage ( key varchar, lastModified timestamp, descriptor varchar, PRIMARY KEY (key) ) WITH COMPACT STORAGE;
*/
public class CassandraStorage
{
private static final String CLUSTER_NAME = "druid_cassandra_cluster";
private static final String INDEX_TABLE_NAME = "index_storage";
private static final String DESCRIPTOR_TABLE_NAME = "descriptor_storage";
private AstyanaxContext<Keyspace> astyanaxContext;
final Keyspace keyspace;
final ChunkedStorageProvider indexStorage;
final ColumnFamily<String, String> descriptorStorage;
final CassandraDataSegmentConfig config;
public CassandraStorage(CassandraDataSegmentConfig config)
{
this.astyanaxContext = new AstyanaxContext.Builder()
.forCluster(CLUSTER_NAME)
.forKeyspace(config.getKeyspace())
.withAstyanaxConfiguration(new AstyanaxConfigurationImpl().setDiscoveryType(NodeDiscoveryType.NONE))
.withConnectionPoolConfiguration(
new ConnectionPoolConfigurationImpl("MyConnectionPool").setMaxConnsPerHost(10)
.setSeeds(config.getHost())).withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
.buildKeyspace(ThriftFamilyFactory.getInstance());
this.astyanaxContext.start();
this.keyspace = this.astyanaxContext.getEntity();
this.config = config;
indexStorage = new CassandraChunkedStorageProvider(keyspace, INDEX_TABLE_NAME);
descriptorStorage = new ColumnFamily<String, String>(DESCRIPTOR_TABLE_NAME,
StringSerializer.get(), StringSerializer.get());
}
}