Bootstrap

Flink获取kafka中每条消息对应的topic

工作中经常会需要使用 Flink 来消费 kafka 中的数据并获取每条 msg 对应 topic 的信息,现总结如下:

public class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema> {
	@Override
	//nextElement 是否表示流的最后一条元素,我们要设置为 false ,因为我们需要 msg 源源不断的被消费
	public boolean isEndOfStream(Tuple2 nextElement) {
		return false;
	}
	
	@Override
	// 反序列化 kafka 的 record,我们直接返回一个 tuple2
	public Tuple2 deserialize(ConsumerRecord record) throws Exception {
		return new Tuple2<>(record.topic(), new String(record.value(), "UTF-8"));
	}
	
	@Override
	//告诉 Flink 我输入的数据类型, 方便 Flink 的类型推断
	public TypeInformation> getProducedType() {
		return new TupleTypeInfo<>(BasicTypeInfo.STRINGTYPEINFO, BasicTypeInfo.STRINGTYPEINFO);
	}
}
public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
				
		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "localhost:9092");
		properties.setProperty("group.id", "test");
		
		FlinkKafkaConsumer> kafkaConsumer = new FlinkKafkaConsumer<>("test", new CustomKafkaDeserializationSchema(), properties);
		kafkaConsumer.setStartFromEarliest();
		env.addSource(kafkaConsumer).flatMap(new FlatMapFunction, Object>() {
			@Override
			public void flatMap(Tuple2 value, Collector out) throws Exception {
				System.out.println("topic==== " + value.f0);
			}
		});
		
		// execute program
		env.execute("Flink Streaming Java API Skeleton");
	}