Formatting.

This commit is contained in:
Brian O'Neill 2013-05-01 15:17:15 -04:00
parent bf01399a2a
commit b8c8ed8d68
1 changed files with 33 additions and 35 deletions

View File

@ -26,13 +26,12 @@ public class CassandraDataSegmentPusher implements DataSegmentPusher
private static final Logger log = new Logger(CassandraDataSegmentPusher.class);
private final CassandraDataSegmentPusherConfig config;
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private final ObjectMapper jsonMapper;
private Cluster cluster;
private Session session;
private String keyspace = null;
private String table = null;
public CassandraDataSegmentPusher(
CassandraDataSegmentPusherConfig config,
@ -42,7 +41,7 @@ public class CassandraDataSegmentPusher implements DataSegmentPusher
this.jsonMapper = jsonMapper;
this.keyspace = this.config.getKeyspace();
this.table = this.config.getTable();
cluster = Cluster.builder().addContactPoints(this.config.getHost()).build();
session = cluster.connect();
session.execute("USE " + keyspace);
@ -51,39 +50,38 @@ public class CassandraDataSegmentPusher implements DataSegmentPusher
@Override
public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException
{
log.info("Writing [%s] to C*", indexFilesDir);
String key = JOINER.join(
config.getKeyspace().isEmpty() ? null : config.getKeyspace(),
DataSegmentPusherUtil.getStorageDir(segment)
);
log.info("Writing [%s] to C*", indexFilesDir);
String key = JOINER.join(
config.getKeyspace().isEmpty() ? null : config.getKeyspace(),
DataSegmentPusherUtil.getStorageDir(segment)
);
// Create index
final File compressedIndexFile = File.createTempFile("druid", "index.zip");
long indexSize = CompressionUtils.zip(indexFilesDir, compressedIndexFile);
int version = IndexIO.getVersionFromDir(indexFilesDir);
// Create descriptor
File descriptorFile = File.createTempFile("druid", "descriptor.json");
Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile);
String statement = "INSERT INTO " + keyspace + "." + table + "(key, version, descriptor, index) VALUES (?,?,?,?)";
PreparedStatement ps = session.prepare(statement);
byte[] indexData = ByteStreams.toByteArray(new FileInputStream(compressedIndexFile));
byte[] descriptorData = ByteStreams.toByteArray(new FileInputStream(compressedIndexFile));
BoundStatement bs = ps.bind(key, version, descriptorData, indexData);
session.execute(bs);
segment = segment.withSize(indexSize)
.withLoadSpec(
ImmutableMap.<String, Object>of("type", "c*", "key", key)
)
.withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir));
// Create index
final File compressedIndexFile = File.createTempFile("druid", "index.zip");
long indexSize = CompressionUtils.zip(indexFilesDir, compressedIndexFile);
int version = IndexIO.getVersionFromDir(indexFilesDir);
log.info("Deleting zipped index File[%s]", compressedIndexFile);
compressedIndexFile.delete();
log.info("Deleting descriptor file[%s]", descriptorFile);
descriptorFile.delete();
return segment;
// Create descriptor
File descriptorFile = File.createTempFile("druid", "descriptor.json");
Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile);
String statement = "INSERT INTO " + keyspace + "." + table + "(key, version, descriptor, index) VALUES (?,?,?,?)";
PreparedStatement ps = session.prepare(statement);
byte[] indexData = ByteStreams.toByteArray(new FileInputStream(compressedIndexFile));
byte[] descriptorData = ByteStreams.toByteArray(new FileInputStream(compressedIndexFile));
BoundStatement bs = ps.bind(key, version, descriptorData, indexData);
session.execute(bs);
segment = segment.withSize(indexSize)
.withLoadSpec(
ImmutableMap.<String, Object> of("type", "c*", "key", key)
)
.withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir));
log.info("Deleting zipped index File[%s]", compressedIndexFile);
compressedIndexFile.delete();
log.info("Deleting descriptor file[%s]", descriptorFile);
descriptorFile.delete();
return segment;
}
}