Working Push & Pull.

This commit is contained in:
Brian O'Neill 2013-05-07 11:35:14 -04:00
parent 17835e6e08
commit 41e9f8fcb4
6 changed files with 104 additions and 68 deletions

View File

@ -1 +1 @@
curl -sX POST "http://localhost:7070/druid/v2/?pretty=true" -H 'content-type: application/json' -d @query curl -sX POST "http://localhost:9090/druid/v2/?pretty=true" -H 'content-type: application/json' -d @query

View File

@ -72,6 +72,7 @@ public class InfoResource
InventoryView serverInventoryView, InventoryView serverInventoryView,
DatabaseSegmentManager databaseSegmentManager, DatabaseSegmentManager databaseSegmentManager,
DatabaseRuleManager databaseRuleManager, DatabaseRuleManager databaseRuleManager,
@Nullable
IndexingServiceClient indexingServiceClient IndexingServiceClient indexingServiceClient
) )
{ {

View File

@ -22,6 +22,9 @@ package com.metamx.druid.loading.cassandra;
import org.skife.config.Config; import org.skife.config.Config;
/** /**
* Cassandra Config
*
* @author boneill42
*/ */
public abstract class CassandraDataSegmentConfig public abstract class CassandraDataSegmentConfig
{ {

View File

@ -20,66 +20,36 @@
package com.metamx.druid.loading.cassandra; package com.metamx.druid.loading.cassandra;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.OutputStream;
import org.apache.commons.io.FileUtils;
import com.google.common.base.Throwables;
import com.google.common.io.Files; import com.google.common.io.Files;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.loading.DataSegmentPuller; import com.metamx.druid.loading.DataSegmentPuller;
import com.metamx.druid.loading.SegmentLoadingException; import com.metamx.druid.loading.SegmentLoadingException;
import com.netflix.astyanax.AstyanaxContext; import com.metamx.druid.utils.CompressionUtils;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.OperationResult; import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList; import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider;
import com.netflix.astyanax.recipes.storage.ChunkedStorage; import com.netflix.astyanax.recipes.storage.ChunkedStorage;
import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider;
import com.netflix.astyanax.recipes.storage.ObjectMetadata; import com.netflix.astyanax.recipes.storage.ObjectMetadata;
import com.netflix.astyanax.serializers.StringSerializer;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;
/** /**
* Cassandra Segment Puller
*
* @author boneill42
*/ */
public class CassandraDataSegmentPuller implements DataSegmentPuller public class CassandraDataSegmentPuller extends CassandraStorage implements DataSegmentPuller
{ {
private static final Logger log = new Logger(CassandraDataSegmentPuller.class); private static final Logger log = new Logger(CassandraDataSegmentPuller.class);
private static final String CLUSTER_NAME = "druid_cassandra_cluster";
private static final String INDEX_TABLE_NAME = "index_storage";
private static final String DESCRIPTOR_TABLE_NAME = "descriptor_storage";
private static final int CONCURRENCY = 10; private static final int CONCURRENCY = 10;
private static final int BATCH_SIZE = 10; private static final int BATCH_SIZE = 10;
private Keyspace keyspace;
private AstyanaxContext<Keyspace> astyanaxContext;
private ChunkedStorageProvider indexStorage;
private ColumnFamily<String, String> descriptorStorage;
public CassandraDataSegmentPuller(CassandraDataSegmentConfig config) public CassandraDataSegmentPuller(CassandraDataSegmentConfig config)
{ {
this.astyanaxContext = new AstyanaxContext.Builder() super(config);
.forCluster(CLUSTER_NAME)
.forKeyspace(config.getKeyspace())
.withAstyanaxConfiguration(new AstyanaxConfigurationImpl().setDiscoveryType(NodeDiscoveryType.NONE))
.withConnectionPoolConfiguration(
new ConnectionPoolConfigurationImpl("MyConnectionPool").setMaxConnsPerHost(10)
.setSeeds(config.getHost())).withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
.buildKeyspace(ThriftFamilyFactory.getInstance());
this.astyanaxContext.start();
this.keyspace = this.astyanaxContext.getEntity();
indexStorage = new CassandraChunkedStorageProvider(keyspace, INDEX_TABLE_NAME);
descriptorStorage = new ColumnFamily<String, String>(DESCRIPTOR_TABLE_NAME,
StringSerializer.get(), StringSerializer.get());
} }
@Override @Override
@ -103,27 +73,22 @@ public class CassandraDataSegmentPuller implements DataSegmentPuller
ObjectMetadata meta = null; ObjectMetadata meta = null;
try try
{ {
final File outFile = new File(outDir, toFilename(key, ".gz")); final File outFile = new File(outDir, "index.zip");
log.info("Writing to [" + outFile.getAbsolutePath() + "]");
OutputStream os = Files.newOutputStreamSupplier(outFile).getOutput();
meta = ChunkedStorage meta = ChunkedStorage
.newReader(indexStorage, key, Files.newOutputStreamSupplier(outFile).getOutput()) .newReader(indexStorage, key, os)
.withBatchSize(BATCH_SIZE) .withBatchSize(BATCH_SIZE)
.withConcurrencyLevel(CONCURRENCY) .withConcurrencyLevel(CONCURRENCY)
.call(); .call();
os.close();
CompressionUtils.unzip(outFile, outDir);
} catch (Exception e) } catch (Exception e)
{ {
log.error("Could not pull segment [" + key + "] from C*", e); throw Throwables.propagate(e);
try
{
FileUtils.deleteDirectory(outDir);
} catch (IOException ioe)
{
log.error("Couldn't delete directory [" + outDir + "] for cleanup.", ioe);
} }
} log.info("Pull of file[%s] completed in %,d millis (%s bytes)", key, System.currentTimeMillis() - startTime,
meta.getObjectSize());
log.info("Pull of file[%s] completed in %,d millis (%s chunks)", key, System.currentTimeMillis() - startTime,
meta.getChunkSize());
} }
@Override @Override
@ -145,11 +110,4 @@ public class CassandraDataSegmentPuller implements DataSegmentPuller
throw new SegmentLoadingException(e, e.getMessage()); throw new SegmentLoadingException(e, e.getMessage());
} }
} }
private String toFilename(String key, final String suffix)
{
String filename = key.substring(key.lastIndexOf("/") + 1);
filename = filename.substring(0, filename.length() - suffix.length());
return filename;
}
} }

View File

@ -24,12 +24,13 @@ import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider; import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider;
import com.netflix.astyanax.recipes.storage.ChunkedStorage; import com.netflix.astyanax.recipes.storage.ChunkedStorage;
import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider; import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider;
import com.netflix.astyanax.recipes.storage.ObjectMetadata;
import com.netflix.astyanax.serializers.StringSerializer; import com.netflix.astyanax.serializers.StringSerializer;
import com.netflix.astyanax.thrift.ThriftFamilyFactory; import com.netflix.astyanax.thrift.ThriftFamilyFactory;
/** /**
* This is the data segment pusher for Cassandra. * Cassandra Segment Pusher
*
* @author boneill42
*/ */
// TODO: Auto-create the schema if it does not exist. // TODO: Auto-create the schema if it does not exist.
// Should we make it so they can specify tables? // Should we make it so they can specify tables?
@ -84,20 +85,22 @@ public class CassandraDataSegmentPusher implements DataSegmentPusher
// Create index // Create index
final File compressedIndexFile = File.createTempFile("druid", "index.zip"); final File compressedIndexFile = File.createTempFile("druid", "index.zip");
long indexSize = CompressionUtils.zip(indexFilesDir, compressedIndexFile); long indexSize = CompressionUtils.zip(indexFilesDir, compressedIndexFile);
log.info("Wrote compressed file [%s] to [%s]", compressedIndexFile.getAbsolutePath(), key);
int version = IndexIO.getVersionFromDir(indexFilesDir); int version = IndexIO.getVersionFromDir(indexFilesDir);
try try
{ {
ObjectMetadata indexMeta = ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile)) long start = System.currentTimeMillis();
ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile))
.withConcurrencyLevel(CONCURRENCY).call(); .withConcurrencyLevel(CONCURRENCY).call();
byte[] json = jsonMapper.writeValueAsBytes(segment); byte[] json = jsonMapper.writeValueAsBytes(segment);
//CassandraDataSegmentDescriptor descriptor = new CassandraDataSegmentDescriptor(segment, json);
MutationBatch mutation = this.keyspace.prepareMutationBatch(); MutationBatch mutation = this.keyspace.prepareMutationBatch();
mutation.withRow(descriptorStorage, key) mutation.withRow(descriptorStorage, key)
.putColumn("lastmodified", System.currentTimeMillis(), null) .putColumn("lastmodified", System.currentTimeMillis(), null)
.putColumn("descriptor", json, null); .putColumn("descriptor", json, null);
mutation.execute(); mutation.execute();
log.info("Wrote index to C* [" + indexMeta.getParentPath() + "]"); log.info("Wrote index to C* in [%s] ms", System.currentTimeMillis() - start);
} catch (Exception e) } catch (Exception e)
{ {
throw new IOException(e); throw new IOException(e);

View File

@ -0,0 +1,71 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading.cassandra;
import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider;
import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider;
import com.netflix.astyanax.serializers.StringSerializer;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;
/**
* Superclass for accessing Cassandra Storage.
*
* This is the schema used to support the index and descriptor storage:
*
* CREATE TABLE index_storage ( key text, chunk text, value blob, PRIMARY KEY (key, chunk)) WITH COMPACT STORAGE;
* CREATE TABLE descriptor_storage ( key varchar, lastModified timestamp, descriptor varchar, PRIMARY KEY (key) ) WITH COMPACT STORAGE;
*/
public class CassandraStorage
{
private static final String CLUSTER_NAME = "druid_cassandra_cluster";
private static final String INDEX_TABLE_NAME = "index_storage";
private static final String DESCRIPTOR_TABLE_NAME = "descriptor_storage";
private AstyanaxContext<Keyspace> astyanaxContext;
Keyspace keyspace;
ChunkedStorageProvider indexStorage;
ColumnFamily<String, String> descriptorStorage;
public CassandraStorage(CassandraDataSegmentConfig config)
{
this.astyanaxContext = new AstyanaxContext.Builder()
.forCluster(CLUSTER_NAME)
.forKeyspace(config.getKeyspace())
.withAstyanaxConfiguration(new AstyanaxConfigurationImpl().setDiscoveryType(NodeDiscoveryType.NONE))
.withConnectionPoolConfiguration(
new ConnectionPoolConfigurationImpl("MyConnectionPool").setMaxConnsPerHost(10)
.setSeeds(config.getHost())).withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
.buildKeyspace(ThriftFamilyFactory.getInstance());
this.astyanaxContext.start();
this.keyspace = this.astyanaxContext.getEntity();
indexStorage = new CassandraChunkedStorageProvider(keyspace, INDEX_TABLE_NAME);
descriptorStorage = new ColumnFamily<String, String>(DESCRIPTOR_TABLE_NAME,
StringSerializer.get(), StringSerializer.get());
}
}