这部分包含新的 Apache Kafka consumer API.
Apache Kafka 版本 0.10+
您可以通过创建 org.apache.storm.kafka.bolt.KafkaBolt 实例并将其作为组件附加到您的topology.如果您使用 trident ,您可以通过使用以下对象完成 org.apache.storm.kafka.trident.TridentState, org.apache.storm.kafka.trident.TridentStateFactory and org.apache.storm.kafka.trident.TridentKafkaUpdater.
您需要为以下两个接口提供实现
###TupleToKafkaMapper 和 TridentTupleToKafkaMapper 这些接口有两个抽象方法:
K getKeyFromTuple(Tuple/TridentTuple tuple);
V getMessageFromTuple(Tuple/TridentTuple tuple);
顾名思义,这两个方法被调用将tuple映射到Kafka message的key和message本身. 如果你只想要一个字段 作为键和一个字段作为值,那么您可以使用提供的FieldNameBasedTupleToKafkaMapper.java 实现. 在KafkaBolt中,使用默认构造函数构造FieldNameBasedTupleToKafkaMapper需要一个字段名称为"key"和"message"的字段以实现向后兼容. 或者,您也可以使用非默认构造函数指定不同的键和消息字段. 在使用TridentKafkaState 时你必须明确key和message的字段名称,因为TridentKafkaState默认的构造函数没有设置参数.在构造FieldNameBasedTupleToKafkaMapper的实例时应明确这些.
###KafkaTopicSelector 和 trident KafkaTopicSelector 这个接口只有一个方法:
public interface KafkaTopicSelector {
String getTopics(Tuple/TridentTuple tuple);
}
该接口的实现应该要根据tuple的 key/message 返回相应的Kafka的topic,如果返回 null 则该消息将被忽略掉.如果您只需要一个静态topic名称,那么可以使用 DefaultTopicSelector.java 并在构造函数中设置topic的名称.
FieldNameTopicSelector
和 FieldIndexTopicSelector
用于选择 tuple 要发送到的topic,用户只需要指定tuple中存储 topic名称的字段名称或字段索引即可(即tuple中的某个字段是kafka topic的名称).当topic的名称不存在时, Field*TopicSelector
会将tuple写入到默认的topic.请确保默认topic已经在kafka中创建并且在Field*TopicSelector
正确设置.
你可以在 topology 通过调用 KafkaBolt.withProducerProperties()
和 TridentKafkaStateFactory.withProducerProperties()
设置kafka producer的所有属性. Kafka producer配置 选择 "Important configuration properties for the producer" 查看更多详情. 所有的kafka producer配置项的key都在 org.apache.kafka.clients.producer.ProducerConfig
类中
通过添加如下属性开启通配符匹配(此功能是为了storm可以动态读取多个kafka topic中的数据,并支持动态发现.看相关功能的实现需求feture)
Config config = new Config();
config.put("kafka.topic.wildcard.match",true);
之后,您可以指定一个通配符topic,例如clickstream.*.log. 这将匹配clickstream.my.log,clickstream.cart.log等topic
For the bolt :
TopologyBuilder builder = new TopologyBuilder();
Fields fields = new Fields("key", "message");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
new Values("storm", "1"),
new Values("trident", "1"),
new Values("needs", "1"),
new Values("javadoc", "1")
);
spout.setCycle(true);
builder.setSpout("spout", spout, 5);
//set producer properties.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaBolt bolt = new KafkaBolt()
.withProducerProperties(props)
.withTopicSelector(new DefaultTopicSelector("test"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
Config conf = new Config();
StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
For Trident:
Fields fields = new Fields("word", "count");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
new Values("storm", "1"),
new Values("trident", "1"),
new Values("needs", "1"),
new Values("javadoc", "1")
);
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
//set producer properties.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(props)
.withKafkaTopicSelector(new DefaultTopicSelector("test"))
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
Config conf = new Config();
StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
spout通过使用KafkaSpoutConfig
类来指定配置. 此类使用Builder模式,可以通过调用其中一个Builders构造函数或通过调用KafkaSpoutConfig类中的静态方法创建一个Builder.创建builder的构造方法或静态方法需要几个键值(稍后可以更改),但这是启动一个spout的所需的最小配置
bootstrapServers
与Kafka Consumer Property"bootstrap.servers"相同. 配置项`topics' 配置的是spout将消费的kafka topic.可以是特定主题名称(1个或多个)的集合列表或正则表达式"Pattern",它指定 任何与正则表达式匹配的主题都将被消费.
在构造函数的情况下,您可能还需要指定key deserializer和value deserializer. 这是为了通过使用Java泛型来保证类型安全. 默认值为"StringDeserializer",可以通过调用"setKeyDeserializer"和"setValueDeserializer"进行覆盖.如果这些设置为null,代码将回退到kafka属性中设置的内容,但最好在这里明确,通过使用Java泛型来确保类型安全.
下面是一些需要特别注意的关键配置项.
setFirstPollOffsetStrategy
允许你设置从哪里开始消费数据. 这在故障恢复和第一次启动spout的情况下会被使用. 可选的的值包括:
EARLIEST
无论之前的消费情况如何,spout会从每个kafka partition能找到的最早的offset开始的读取LATEST
无论之前的消费情况如何,spout会从每个kafka partition当前最新的offset开始的读取UNCOMMITTED_EARLIEST
(默认值) spout 会从每个partition的最后一次提交的offset开始读取. 如果offset不存在或者过期, 则会依照 EARLIEST
进行读取.UNCOMMITTED_LATEST
spout 会从每个partition的最后一次提交的offset开始读取, 如果offset不存在或者过期, 则会依照 LATEST
进行读取.setRecordTranslator
可以修改spout如何将Kafka消费者message转换为tuple,以及将该tuple发布到哪个stream中.默认情况下,"topic","partition","offset","key"和"value"将被发送到"default"stream. 如果要将条目根据topic输出到不同的stream中,Storm提供了"ByTopicRecordTranslator". 有关如何使用这些的更多示例,请参阅下文. setProp
可用于设置kafka属性. setGroupId
可以让您设置kafka使用者组属性"group.id". setSSLKeystore
和setSSLTruststore
允许你配置SSL认证.
API是用java 8 lambda表达式写的. 它也可以用于java7及更低的版本.
以下将消费kafka中"demo_topic"的所有消息,并将其发送到MyBolt,其中包含"topic","partition","offset","key","value".
final TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, "demo_topic").build()), 1);
tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout");
...
通配符 topics 将消费所有符合通配符的topics. 在下面的例子中 "topic", "topic_foo" 和 "topic_bar" 适配通配符 "topic.*", 但是 "not_my_topic" 并不适配.
final TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, Pattern.compile("topic.*")).build()), 1);
tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout");
...
这个案例使用 java 8 lambda 表达式.
final TopologyBuilder tp = new TopologyBuilder();
//默认情况下,spout 消费但未被match到的topic的message的"topic","key"和"value"将发送到"STREAM_1"
ByTopicRecordTranslator<String, String> byTopic = new ByTopicRecordTranslator<>(
(r) -> new Values(r.topic(), r.key(), r.value()),
new Fields("topic", "key", "value"), "STREAM_1");
//topic_2 所有的消息的 "key" and "value" 将发送到 "STREAM_2"中
byTopic.forTopic("topic_2", (r) -> new Values(r.key(), r.value()), new Fields("key", "value"), "STREAM_2");
tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, "topic_1", "topic_2", "topic_3").build()), 1);
tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout", "STREAM_1");
tp.setBolt("another", new myOtherBolt()).shuffleGrouping("kafka_spout", "STREAM_2");
...
final TridentTopology tridentTopology = new TridentTopology();
final Stream spoutStream = tridentTopology.newStream("kafkaSpout",
new KafkaTridentSpoutOpaque<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, Pattern.compile("topic.*")).build()))
.parallelismHint(1)
...
Trident不支持多个stream且不支持设置将strem分发到多个output. 并且,如果每个output 的topic的字段不一致会抛出异常而不会继续.
在大多数情况下,内置的SimpleRecordTranslator和ByTopicRecordTranslator应该满足您的使用. 如果您遇到需要定制的情况那么这个文档将会描述如何正确地做到这一点,涉及到一些不太常用的类.适用的要点是使用ConsumerRecord并将其转换为可以emitted 的"List