diff --git a/extensions/druid-rocketmq/pom.xml b/extensions/druid-rocketmq/pom.xml
new file mode 100644
index 00000000000..a02bd191467
--- /dev/null
+++ b/extensions/druid-rocketmq/pom.xml
@@ -0,0 +1,50 @@
+
+
+
+ 4.0.0
+
+ druid
+ io.druid
+ 0.9.0-SNAPSHOT
+ ../../pom.xml
+
+
+
+ 3.2.6
+
+
+ druid-rocketmq
+
+
+
+
+ com.alibaba.rocketmq
+ rocketmq-client
+ ${rocketmq.version}
+
+
+
+ io.druid
+ druid-api
+
+
+
+
+
\ No newline at end of file
diff --git a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java
new file mode 100644
index 00000000000..159928912a9
--- /dev/null
+++ b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.firehose.rocketmq;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import io.druid.initialization.DruidModule;
+
+import java.util.List;
+
+public class RocketMQDruidModule implements DruidModule {
+
+ @Override
+ public List extends Module> getJacksonModules() {
+ return ImmutableList.of(
+ new SimpleModule("RocketMQFirehoseModule")
+ .registerSubtypes(
+ new NamedType(RocketMQFirehoseFactory.class, "RocketMQ-3.2.6")
+ )
+ );
+ }
+
+ @Override
+ public void configure(Binder binder) {
+
+ }
+}
diff --git a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java
new file mode 100644
index 00000000000..7d930e5c85b
--- /dev/null
+++ b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java
@@ -0,0 +1,537 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.firehose.rocketmq;
+
+import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
+import com.alibaba.rocketmq.client.consumer.MessageQueueListener;
+import com.alibaba.rocketmq.client.consumer.PullResult;
+import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
+import com.alibaba.rocketmq.client.consumer.store.ReadOffsetType;
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.ServiceThread;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Sets;
+import com.metamx.common.parsers.ParseException;
+import io.druid.data.input.ByteBufferInputRowParser;
+import io.druid.data.input.Firehose;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
+
+public class RocketMQFirehoseFactory implements FirehoseFactory {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQFirehoseFactory.class);
+
+ /**
+ * Passed in configuration for consumer client.
+ */
+ @JsonProperty
+ private final Properties consumerProps;
+
+ /**
+ * Consumer group.
+ */
+ @JsonProperty
+ private final String consumerGroup;
+
+ /**
+ * Topics to consume.
+ * Multiple topics are separated by comma ",".
+ */
+ @JsonProperty
+ private final String feed;
+
+
+ /**
+ * Store messages that are fetched from brokers but not yet delivered to druid via fire hose.
+ */
+ private ConcurrentHashMap> messageQueueTreeSetMap = new ConcurrentHashMap<>();
+
+ /**
+ * Store message consuming status.
+ */
+ private ConcurrentHashMap> windows = new ConcurrentHashMap<>();
+
+ /**
+ * Default pull batch size.
+ */
+ private static final int PULL_BATCH_SIZE = 32;
+
+ @JsonCreator
+ public RocketMQFirehoseFactory(@JsonProperty("consumerProps") Properties consumerProps,
+ @JsonProperty("consumerGroup") String consumerGroup,
+ @JsonProperty("feed") String feed) {
+ this.consumerProps = consumerProps;
+ for (Map.Entry