diff --git a/examples/cassandra/client.sh b/examples/cassandra/client.sh index 62245719f1b..5ecb6d6cede 100755 --- a/examples/cassandra/client.sh +++ b/examples/cassandra/client.sh @@ -1 +1 @@ -curl -sX POST "http://localhost:7070/druid/v2/?pretty=true" -H 'content-type: application/json' -d @query +curl -sX POST "http://localhost:9090/druid/v2/?pretty=true" -H 'content-type: application/json' -d @query diff --git a/server/src/main/java/com/metamx/druid/http/InfoResource.java b/server/src/main/java/com/metamx/druid/http/InfoResource.java index ff5b5ae58d7..18e9e68e4c4 100644 --- a/server/src/main/java/com/metamx/druid/http/InfoResource.java +++ b/server/src/main/java/com/metamx/druid/http/InfoResource.java @@ -72,6 +72,7 @@ public class InfoResource InventoryView serverInventoryView, DatabaseSegmentManager databaseSegmentManager, DatabaseRuleManager databaseRuleManager, + @Nullable IndexingServiceClient indexingServiceClient ) { diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentConfig.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentConfig.java index c370f0c2ecc..d2b9e2cbe83 100644 --- a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentConfig.java +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentConfig.java @@ -22,6 +22,9 @@ package com.metamx.druid.loading.cassandra; import org.skife.config.Config; /** + * Cassandra Config + * + * @author boneill42 */ public abstract class CassandraDataSegmentConfig { diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java index f6c42a1d54c..823fa46ce24 100644 --- a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java @@ -20,66 +20,36 @@ package com.metamx.druid.loading.cassandra; import java.io.File; -import java.io.IOException; - -import org.apache.commons.io.FileUtils; +import java.io.OutputStream; +import com.google.common.base.Throwables; import com.google.common.io.Files; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.loading.DataSegmentPuller; import com.metamx.druid.loading.SegmentLoadingException; -import com.netflix.astyanax.AstyanaxContext; -import com.netflix.astyanax.Keyspace; -import com.netflix.astyanax.connectionpool.NodeDiscoveryType; +import com.metamx.druid.utils.CompressionUtils; import com.netflix.astyanax.connectionpool.OperationResult; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; -import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl; -import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor; -import com.netflix.astyanax.impl.AstyanaxConfigurationImpl; -import com.netflix.astyanax.model.ColumnFamily; import com.netflix.astyanax.model.ColumnList; -import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider; import com.netflix.astyanax.recipes.storage.ChunkedStorage; -import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider; import com.netflix.astyanax.recipes.storage.ObjectMetadata; -import com.netflix.astyanax.serializers.StringSerializer; -import com.netflix.astyanax.thrift.ThriftFamilyFactory; /** + * Cassandra Segment Puller + * + * @author boneill42 */ -public class CassandraDataSegmentPuller implements DataSegmentPuller +public class CassandraDataSegmentPuller extends CassandraStorage implements DataSegmentPuller { private static final Logger log = new Logger(CassandraDataSegmentPuller.class); - private static final String CLUSTER_NAME = "druid_cassandra_cluster"; - private static final String INDEX_TABLE_NAME = "index_storage"; - private static final String DESCRIPTOR_TABLE_NAME = "descriptor_storage"; private static final int CONCURRENCY = 10; private static final int BATCH_SIZE = 10; - private Keyspace keyspace; - private AstyanaxContext astyanaxContext; - private ChunkedStorageProvider indexStorage; - private ColumnFamily descriptorStorage; - - public CassandraDataSegmentPuller(CassandraDataSegmentConfig config) + public CassandraDataSegmentPuller(CassandraDataSegmentConfig config) { - this.astyanaxContext = new AstyanaxContext.Builder() - .forCluster(CLUSTER_NAME) - .forKeyspace(config.getKeyspace()) - .withAstyanaxConfiguration(new AstyanaxConfigurationImpl().setDiscoveryType(NodeDiscoveryType.NONE)) - .withConnectionPoolConfiguration( - new ConnectionPoolConfigurationImpl("MyConnectionPool").setMaxConnsPerHost(10) - .setSeeds(config.getHost())).withConnectionPoolMonitor(new CountingConnectionPoolMonitor()) - .buildKeyspace(ThriftFamilyFactory.getInstance()); - this.astyanaxContext.start(); - this.keyspace = this.astyanaxContext.getEntity(); - - indexStorage = new CassandraChunkedStorageProvider(keyspace, INDEX_TABLE_NAME); - - descriptorStorage = new ColumnFamily(DESCRIPTOR_TABLE_NAME, - StringSerializer.get(), StringSerializer.get()); + super(config); } @Override @@ -103,27 +73,22 @@ public class CassandraDataSegmentPuller implements DataSegmentPuller ObjectMetadata meta = null; try { - final File outFile = new File(outDir, toFilename(key, ".gz")); + final File outFile = new File(outDir, "index.zip"); + log.info("Writing to [" + outFile.getAbsolutePath() + "]"); + OutputStream os = Files.newOutputStreamSupplier(outFile).getOutput(); meta = ChunkedStorage - .newReader(indexStorage, key, Files.newOutputStreamSupplier(outFile).getOutput()) + .newReader(indexStorage, key, os) .withBatchSize(BATCH_SIZE) .withConcurrencyLevel(CONCURRENCY) - .call(); + .call(); + os.close(); + CompressionUtils.unzip(outFile, outDir); } catch (Exception e) { - log.error("Could not pull segment [" + key + "] from C*", e); - try - { - FileUtils.deleteDirectory(outDir); - } catch (IOException ioe) - { - log.error("Couldn't delete directory [" + outDir + "] for cleanup.", ioe); - } - } - - log.info("Pull of file[%s] completed in %,d millis (%s chunks)", key, System.currentTimeMillis() - startTime, - meta.getChunkSize()); - + throw Throwables.propagate(e); + } + log.info("Pull of file[%s] completed in %,d millis (%s bytes)", key, System.currentTimeMillis() - startTime, + meta.getObjectSize()); } @Override @@ -145,11 +110,4 @@ public class CassandraDataSegmentPuller implements DataSegmentPuller throw new SegmentLoadingException(e, e.getMessage()); } } - - private String toFilename(String key, final String suffix) - { - String filename = key.substring(key.lastIndexOf("/") + 1); - filename = filename.substring(0, filename.length() - suffix.length()); - return filename; - } } diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java index 525ff969780..7bd1f897347 100644 --- a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java @@ -24,12 +24,13 @@ import com.netflix.astyanax.model.ColumnFamily; import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider; import com.netflix.astyanax.recipes.storage.ChunkedStorage; import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider; -import com.netflix.astyanax.recipes.storage.ObjectMetadata; import com.netflix.astyanax.serializers.StringSerializer; import com.netflix.astyanax.thrift.ThriftFamilyFactory; /** - * This is the data segment pusher for Cassandra. + * Cassandra Segment Pusher + * + * @author boneill42 */ // TODO: Auto-create the schema if it does not exist. // Should we make it so they can specify tables? @@ -84,20 +85,22 @@ public class CassandraDataSegmentPusher implements DataSegmentPusher // Create index final File compressedIndexFile = File.createTempFile("druid", "index.zip"); long indexSize = CompressionUtils.zip(indexFilesDir, compressedIndexFile); + log.info("Wrote compressed file [%s] to [%s]", compressedIndexFile.getAbsolutePath(), key); + int version = IndexIO.getVersionFromDir(indexFilesDir); try { - ObjectMetadata indexMeta = ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile)) + long start = System.currentTimeMillis(); + ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile)) .withConcurrencyLevel(CONCURRENCY).call(); byte[] json = jsonMapper.writeValueAsBytes(segment); - //CassandraDataSegmentDescriptor descriptor = new CassandraDataSegmentDescriptor(segment, json); MutationBatch mutation = this.keyspace.prepareMutationBatch(); mutation.withRow(descriptorStorage, key) .putColumn("lastmodified", System.currentTimeMillis(), null) .putColumn("descriptor", json, null); mutation.execute(); - log.info("Wrote index to C* [" + indexMeta.getParentPath() + "]"); + log.info("Wrote index to C* in [%s] ms", System.currentTimeMillis() - start); } catch (Exception e) { throw new IOException(e); diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraStorage.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraStorage.java new file mode 100644 index 00000000000..e770af69c59 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraStorage.java @@ -0,0 +1,71 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.loading.cassandra; + +import com.netflix.astyanax.AstyanaxContext; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.connectionpool.NodeDiscoveryType; +import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl; +import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor; +import com.netflix.astyanax.impl.AstyanaxConfigurationImpl; +import com.netflix.astyanax.model.ColumnFamily; +import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider; +import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider; +import com.netflix.astyanax.serializers.StringSerializer; +import com.netflix.astyanax.thrift.ThriftFamilyFactory; + +/** + * Superclass for accessing Cassandra Storage. + * + * This is the schema used to support the index and descriptor storage: + * + * CREATE TABLE index_storage ( key text, chunk text, value blob, PRIMARY KEY (key, chunk)) WITH COMPACT STORAGE; + * CREATE TABLE descriptor_storage ( key varchar, lastModified timestamp, descriptor varchar, PRIMARY KEY (key) ) WITH COMPACT STORAGE; + */ +public class CassandraStorage +{ + private static final String CLUSTER_NAME = "druid_cassandra_cluster"; + private static final String INDEX_TABLE_NAME = "index_storage"; + private static final String DESCRIPTOR_TABLE_NAME = "descriptor_storage"; + + private AstyanaxContext astyanaxContext; + Keyspace keyspace; + ChunkedStorageProvider indexStorage; + ColumnFamily descriptorStorage; + + public CassandraStorage(CassandraDataSegmentConfig config) + { + this.astyanaxContext = new AstyanaxContext.Builder() + .forCluster(CLUSTER_NAME) + .forKeyspace(config.getKeyspace()) + .withAstyanaxConfiguration(new AstyanaxConfigurationImpl().setDiscoveryType(NodeDiscoveryType.NONE)) + .withConnectionPoolConfiguration( + new ConnectionPoolConfigurationImpl("MyConnectionPool").setMaxConnsPerHost(10) + .setSeeds(config.getHost())).withConnectionPoolMonitor(new CountingConnectionPoolMonitor()) + .buildKeyspace(ThriftFamilyFactory.getInstance()); + this.astyanaxContext.start(); + this.keyspace = this.astyanaxContext.getEntity(); + + indexStorage = new CassandraChunkedStorageProvider(keyspace, INDEX_TABLE_NAME); + + descriptorStorage = new ColumnFamily(DESCRIPTOR_TABLE_NAME, + StringSerializer.get(), StringSerializer.get()); + } +}