springcloud 微服务日志写入kafka
SpringCloud微服务日志可以写入file 再通过filebeat写入logstash, 或者直接写入logstash。日志写入kafka,可以利用kafak的高吞吐量高性能来降低系统延迟,之后再异步写入logstash.
启动kafka服务
启动zookeeper
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
启动kafka
./bin/kafka-server-start.sh ./config/server.properties
创建topic
bin/kafka-topics.sh --create --topic logger-channel --bootstrap-server localhost:9092
消费端
./kafka-console-consumer.sh --topic logger-channel --bootstrap-server localhost:9092
springBoot服务
引入依赖
org.apache.kafka kafka-clients 2.8.0
编写logback-spring.xml
使用了LayoutKafkaMessageEncoder进行消息编码AsyncAppender 异步日志topic 指定主题logger-channel
kafka-log-test
%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n
${logDir}/${logName}.log
${logDir}/history/${myspringboottest_log}.%d{yyyy-MM-dd}.rar
30
%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n
false
true
{"appName":"${applicationName}","env":"${profileActive}"}
UTF-8
logger-channel
bootstrap.servers=localhost:9092
true
true
0
2048
测试类controller
@GetMapping("/kafka")
public String kafka(){
int time = RandomUtil.randomInt(0, 100);
log.info("cost time: "+time);
log.debug("debug time: "+time);
return "cost time: "+time;
}
消费端输出两条消息 info、debug
{"@timestamp":"2021-08-01T14:47:52.068+08:00","@version":1,"message":"cost time: 32","logger":"com.paw.kafka.elk.controller.KafkaLogController","thread":"http-nio-8080-exec-2","level":"INFO","levelVal":20000,"caller":{"class":"com.paw.kafka.elk.controller.KafkaLogController","method":"kafka","file":"KafkaLogController.java","line":35},"appName":"paw-kelk","env":"dev"}
{"@timestamp":"2021-08-01T14:47:52.068+08:00","@version":1,"message":"debug time: 32","logger":"com.paw.kafka.elk.controller.KafkaLogController","thread":"http-nio-8080-exec-2","level":"DEBUG","levelVal":10000,"caller":{"class":"com.paw.kafka.elk.controller.KafkaLogController","method":"kafka","file":"KafkaLogController.java","line":36},"appName":"paw-kelk","env":"dev"}
springcloud服务的日志已写入到kafka中。