compression for batch-zk

This commit is contained in:
fjy 2013-07-22 14:05:48 -07:00
parent 5d96f6dc99
commit f5536ed010
5 changed files with 74 additions and 39 deletions

View File

@ -373,18 +373,21 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
if (requestLogger == null) {
try {
final String loggingType = props.getProperty("druid.request.logging.type");
if("emitter".equals(loggingType)) {
setRequestLogger(Initialization.makeEmittingRequestLogger(
getProps(),
getEmitter()
));
}
else if ("file".equalsIgnoreCase(loggingType)) {
setRequestLogger(Initialization.makeFileRequestLogger(
getJsonMapper(),
getScheduledExecutorFactory(),
getProps()
));
if ("emitter".equals(loggingType)) {
setRequestLogger(
Initialization.makeEmittingRequestLogger(
getProps(),
getEmitter()
)
);
} else if ("file".equalsIgnoreCase(loggingType)) {
setRequestLogger(
Initialization.makeFileRequestLogger(
getJsonMapper(),
getScheduledExecutorFactory(),
getProps()
)
);
} else {
setRequestLogger(new NoopRequestLogger());
}
@ -428,19 +431,46 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
final Announcer announcer = new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s"));
lifecycle.addManagedInstance(announcer);
setAnnouncer(
new MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
Arrays.<AbstractDataSegmentAnnouncer>asList(
new BatchingCuratorDataSegmentAnnouncer(
getDruidServerMetadata(),
getConfigFactory().build(ZkDataSegmentAnnouncerConfig.class),
announcer,
getJsonMapper()
),
new CuratorDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper())
)
)
);
final ZkDataSegmentAnnouncerConfig config = getConfigFactory().build(ZkDataSegmentAnnouncerConfig.class);
final String announcerType = config.getAnnouncerType();
final DataSegmentAnnouncer dataSegmentAnnouncer;
if ("curator".equalsIgnoreCase(announcerType)) {
dataSegmentAnnouncer = new CuratorDataSegmentAnnouncer(
getDruidServerMetadata(),
getZkPaths(),
announcer,
getJsonMapper()
);
} else if ("batch".equalsIgnoreCase(announcerType)) {
dataSegmentAnnouncer = new BatchingCuratorDataSegmentAnnouncer(
getDruidServerMetadata(),
config,
announcer,
getJsonMapper()
);
} else if ("multiple".equalsIgnoreCase(announcerType)) {
dataSegmentAnnouncer = new MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
Arrays.<AbstractDataSegmentAnnouncer>asList(
new BatchingCuratorDataSegmentAnnouncer(
getDruidServerMetadata(),
config,
announcer,
getJsonMapper()
),
new CuratorDataSegmentAnnouncer(
getDruidServerMetadata(),
getZkPaths(),
announcer,
getJsonMapper()
)
)
);
} else {
throw new ISE("Unknown announcer type [%s]", announcerType);
}
setAnnouncer(dataSegmentAnnouncer);
lifecycle.addManagedInstance(getAnnouncer(), Lifecycle.Stage.LAST);
}

View File

@ -32,4 +32,8 @@ public abstract class CuratorConfig
@Config("druid.zk.service.sessionTimeoutMs")
@Default("30000")
public abstract int getZkSessionTimeoutMs();
@Config("druid.curator.compression.enable")
@Default("false")
public abstract boolean enableCompression();
}

View File

@ -72,13 +72,13 @@ public class Initialization
/**
* Load properties.
* Properties are layered:
*
* <p/>
* # stored in zookeeper
* # runtime.properties file,
* # cmdLine -D
*
* <p/>
* command line overrides runtime.properties which overrides zookeeper
*
* <p/>
* Idempotent. Thread-safe. Properties are only loaded once.
* If property druid.zk.service.host is not set then do not load properties from zookeeper.
*
@ -196,10 +196,10 @@ public class Initialization
CuratorFrameworkFactory.builder()
.connectString(curatorConfig.getZkHosts())
.sessionTimeoutMs(curatorConfig.getZkSessionTimeoutMs())
.retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30))
// Don't compress stuff written just yet, need to get code deployed first.
.compressionProvider(new PotentiallyGzippedCompressionProvider(false))
.build();
.retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30))
// Don't compress stuff written just yet, need to get code deployed first.
.compressionProvider(new PotentiallyGzippedCompressionProvider(curatorConfig.enableCompression()))
.build();
lifecycle.addHandler(
new Lifecycle.Handler()
@ -335,9 +335,9 @@ public class Initialization
}
public static RequestLogger makeFileRequestLogger(
ObjectMapper objectMapper,
ScheduledExecutorFactory factory,
Properties props
ObjectMapper objectMapper,
ScheduledExecutorFactory factory,
Properties props
) throws IOException
{
return new FileRequestLogger(

View File

@ -14,4 +14,8 @@ public abstract class ZkDataSegmentAnnouncerConfig extends ZkPathsConfig
@Config("druid.zk.maxNumBytesPerNode")
@Default("512000")
public abstract long getMaxNumBytes();
@Config("druid.announcer.type")
@Default("curator")
public abstract String getAnnouncerType();
}

View File

@ -143,10 +143,7 @@ public class SpatialDimensionRowFormatter
@Override
public String toString()
{
return "InputRow{" +
"timestamp=" + row.getTimestampFromEpoch() +
", dimensions=" + row.getDimensions() +
'}';
return row.toString();
}
};