Fix hierarchy in Pusher (to use CassandraStorage superclass)

This commit is contained in:
Brian O'Neill 2013-05-08 15:50:58 -04:00
parent 10a96626d4
commit 3eb0f4dfca
2 changed files with 8 additions and 41 deletions

View File

@ -13,64 +13,30 @@ import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.DataSegmentPusherUtil;
import com.metamx.druid.utils.CompressionUtils;
import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider;
import com.netflix.astyanax.recipes.storage.ChunkedStorage;
import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider;
import com.netflix.astyanax.serializers.StringSerializer;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;
/**
* Cassandra Segment Pusher
*
* @author boneill42
*/
// TODO: Auto-create the schema if it does not exist.
// Should we make it so they can specify tables?
public class CassandraDataSegmentPusher implements DataSegmentPusher
public class CassandraDataSegmentPusher extends CassandraStorage implements DataSegmentPusher
{
private static final Logger log = new Logger(CassandraDataSegmentPusher.class);
private static final String CLUSTER_NAME = "druid_cassandra_cluster";
private static final String INDEX_TABLE_NAME = "index_storage";
private static final String DESCRIPTOR_TABLE_NAME = "descriptor_storage";
private static final int CONCURRENCY = 10;
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private final CassandraDataSegmentConfig config;
private final ObjectMapper jsonMapper;
private Keyspace keyspace;
private AstyanaxContext<Keyspace> astyanaxContext;
private ChunkedStorageProvider indexStorage;
private ColumnFamily<String, String> descriptorStorage;
public CassandraDataSegmentPusher(
CassandraDataSegmentConfig config,
ObjectMapper jsonMapper)
{
this.config = config;
this.jsonMapper = jsonMapper;
this.astyanaxContext = new AstyanaxContext.Builder()
.forCluster(CLUSTER_NAME)
.forKeyspace(config.getKeyspace())
.withAstyanaxConfiguration(new AstyanaxConfigurationImpl().setDiscoveryType(NodeDiscoveryType.NONE))
.withConnectionPoolConfiguration(
new ConnectionPoolConfigurationImpl("MyConnectionPool").setMaxConnsPerHost(10)
.setSeeds(config.getHost())).withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
.buildKeyspace(ThriftFamilyFactory.getInstance());
this.astyanaxContext.start();
this.keyspace = this.astyanaxContext.getEntity();
descriptorStorage = new ColumnFamily<String, String>(DESCRIPTOR_TABLE_NAME,
StringSerializer.get(), StringSerializer.get());
indexStorage = new CassandraChunkedStorageProvider(keyspace, INDEX_TABLE_NAME);
super(config);
this.jsonMapper=jsonMapper;
}
@Override

View File

@ -46,9 +46,10 @@ public class CassandraStorage
private static final String DESCRIPTOR_TABLE_NAME = "descriptor_storage";
private AstyanaxContext<Keyspace> astyanaxContext;
Keyspace keyspace;
ChunkedStorageProvider indexStorage;
ColumnFamily<String, String> descriptorStorage;
final Keyspace keyspace;
final ChunkedStorageProvider indexStorage;
final ColumnFamily<String, String> descriptorStorage;
final CassandraDataSegmentConfig config;
public CassandraStorage(CassandraDataSegmentConfig config)
{
@ -62,7 +63,7 @@ public class CassandraStorage
.buildKeyspace(ThriftFamilyFactory.getInstance());
this.astyanaxContext.start();
this.keyspace = this.astyanaxContext.getEntity();
this.config = config;
indexStorage = new CassandraChunkedStorageProvider(keyspace, INDEX_TABLE_NAME);
descriptorStorage = new ColumnFamily<String, String>(DESCRIPTOR_TABLE_NAME,