k8s 上运行我们的 springboot 服务之——flume同步数据到到clickHouse
clickHouse的简单介绍,详细介绍请查看官网或者百度
1)clickhouse非hadoop体系
2)使用sql语句,对于熟悉关系数据的人员入门相对简单
3)clickhouse最好用来读,不要用来变更,写用批量的方式
4)各种日志数据我们可以用flume同步到clickhouse来统一管理和做用户行为分析
5)mysql 增量同步到clickhouse,这里有一个思考:系统日志,交易日志,用户行为日志,已生成订单等不变的数据似乎可以同步到clickhouse来做报表、统计、数据分析等。
由于用户经常查询和操作一般都是最近的或者最新的数据,可以把这部分变更的有事务要求的数据放到mysql中。把mysql数据同步到clickhouse,近期最新和变更的数据在mysql中操作,其他大部分数据在clickhouse中操作,这样来减轻关系型数据库的性能瓶颈。
6)面对错综复杂的数据源我们似乎可以使用flink来把数据统一归集到clickhouse
以上都需要根据实际情况去测试使用。毕竟实践是检验真理的唯一标准
flume的简单介绍,详细介绍请查看官网或者百度
1)高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统
2)支持监听多种方式多种类型的文件或者文件目录数据的变更,以获得变更的数据并把这部分数据推送到不同的数据接收中间件
3)flume提供了多种插件来完成2中的需求,常用的例如:监听TCP的端口做为数据源,监听目录下日志文件的变更等
4)flume可以把变更的数据同步队列中,然后队列把数据分发到我们各种数据仓库中间件中,也可以不通过队列直接把数据同步存储到数据仓库中间件中
5)我们也可以自定义flume的ng来满足我们自己特殊的数据变更同步需求
本文主要讲解内容如下:
1、自定义实现flume ng
2、测试我们系统生成的日志通过1中实现的flume ng同步到 clickhouse
下面是核心代码:
1)在clickhouse中创建表:
CREATE TABLE default.sys_log (
`id` String,
`sys_name` String,
`level` String,
`msg` String,
`thread` String,
`create_date` DateTime,
`exe_date` DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(create_date)
ORDER BY exe_date;
2)自定义flume sink pom.xml
org.apache.flume
flume-ng-core
com.alibaba
fastjson
com.google.guava
guava
org.apache.flume
flume-ng-configuration
org.apache.flume
flume-ng-sdk
com.opencsv
opencsv
4.2
ru.yandex.clickhouse
clickhouse-jdbc
0.2.4
org.projectlombok
lombok
3) flume 自定义sink 核心类
private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSink.class);
private BalancedClickhouseDataSource dataSource = null;
private SinkCounter sinkCounter = null;
private String host = null;
private String port = null;
private String user = null;
private String password = null;
private String database = null;
private String table = null;
private int batchSize;
@Override
public Status process() throws EventDeliveryException {
Status status = null;
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
List insertData = new ArrayList<>();
try {
ClickHouseConnectionImpl conn = (ClickHouseConnectionImpl) dataSource.getConnection();
int count;
for (count = 0; count < batchSize; ++count) {
Event event = ch.take();
if (event == null) {
break;
}
insertData.add(StringUtil.buildLog(new String(event.getBody())));
}
if (count <= 0) {
sinkCounter.incrementBatchEmptyCount();
txn.commit();
return Status.BACKOFF;
} else if (count < batchSize) {
sinkCounter.incrementBatchUnderflowCount();
} else {
sinkCounter.incrementBatchCompleteCount();
}
sinkCounter.addToEventDrainAttemptCount(count);
ClickHouseStatement sth = conn.createStatement();
sth.write().table(String.format(" %s.%s", database, table)).data(new ByteArrayInputStream(JsonUtil.t2JsonString(insertData).getBytes()), ClickHouseFormat.JSONEachRow).addDbParam(ClickHouseQueryParam.MAX_PARALLEL_REPLICAS, MAX_PARALLEL_REPLICAS_VALUE).send();
sinkCounter.incrementEventDrainSuccessCount();
status = Status.READY;
txn.commit();
} catch (Throwable t) {
txn.rollback();
LOGGER.error(t.getMessage(), t);
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error) t;
}
} finally {
txn.close();
}
return status;
}
@Override
public void configure(Context context) {
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
Preconditions.checkArgument(context.getString(HOST) != null && context.getString(HOST).length() > 0, "ClickHouse host must be specified!");
this.host = context.getString(HOST);
if (!this.host.startsWith(CLICK_HOUSE_PREFIX)) {
this.host = CLICK_HOUSE_PREFIX + this.host;
}
Preconditions.checkArgument(context.getString(DATABASE) != null && context.getString(DATABASE).length() > 0, "ClickHouse database must be specified!");
this.database = context.getString(DATABASE);
Preconditions.checkArgument(context.getString(TABLE) != null && context.getString(TABLE).length() > 0, "ClickHouse table must be specified!");
this.table = context.getString(TABLE);
this.port = context.getString(PORT, DEFAULT_PORT);
this.user = context.getString(USER, DEFAULT_USER);
this.password = context.getString(PASSWORD, DEFAULT_PASSWORD);
this.batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
}
@Override
public void start() {
LOGGER.info("clickHouse sink {} starting", getName());
String jdbcUrl = String.format("%s:%s/%s", this.host, this.port, this.database);
ClickHouseProperties properties = new ClickHouseProperties().withCredentials(this.user, this.password);
this.dataSource = new BalancedClickhouseDataSource(jdbcUrl, properties);
sinkCounter.start();
super.start();
LOGGER.info("clickHouse sink {} started", getName());
}
@Override
public void stop() {
LOGGER.info("clickHouse sink {} stopping", getName());
sinkCounter.incrementConnectionClosedCount();
sinkCounter.stop();
super.stop();
LOGGER.info("clickHouse sink {} stopped", getName());
}
4)flume 对应配置:
# 指定Agent的组件名称
a1.sources = r1
a1.sinks = sink1
a1.channels = c1
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/home/spark/flume/data/log
a1.sources.r1.channels=c1
a1.sources.r1.fileHeader = false
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
# 指定Flume sink
a1.sinks.sink1.type = com.zhy.frame.newsql.clickhouse.sink.sink.ClickHouseSink
a1.sinks.sink1.host = localhost
a1.sinks.sink1.port = 8123
a1.sinks.sink1.database = default
a1.sinks.sink1.table = sys_log
a1.sinks.sink1.batchSize = 10000
a1.sinks.sink1.user = default
a1.sinks.sink1.password =
# 指定Flume channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 绑定source和sink到channel上
a1.sources.r1.channels = c1
a1.sinks.sink1.channel = c1
5)系统logback配置
logback
info
${CONSOLE_LOG_PATTERN}
UTF-8
${logging.path}/debug_${sysName}.log
{
"sysName":"${sysName}",
"thread":"%thread",
"exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}",
"level":"%level",
"msg": "%msg"
}
${logging.path}/debug/debug_${sysName}_%d{yyyy-MM-dd}.%i.log
${logMaxFileSize}
${logMaxHistory}
DEBUG
ACCEPT
DENY
${logging.path}/info_${sysName}.log
{
"sysName":"${sysName}",
"thread":"%thread",
"exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}",
"level":"%level",
"msg": "%msg"
}
${logging.path}/info/info_${sysName}_%d{yyyy-MM-dd}.%i.log
${logMaxFileSize}
${logMaxHistory}
INFO
ACCEPT
DENY
${logging.path}/warn_${sysName}.log
{
"sysName":"${sysName}",
"thread":"%thread",
"exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}",
"level":"%level",
"msg": "%msg"
}
${logging.path}/warn/warn_${sysName}_%d{yyyy-MM-dd}.%i.log
${logMaxFileSize}
${logMaxHistory}
WARN
ACCEPT
DENY
${logging.path}/error_${sysName}.log
{
"sysName":"${sysName}",
"thread":"%thread",
"exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}",
"level":"%level",
"msg": "%msg"
}
${logging.path}/error/error_${sysName}_%d{yyyy-MM-dd}.%i.log
${logMaxFileSize}
${logMaxHistory}
ERROR
ACCEPT
DENY
综上,系统日志首先会在info.log,error.log,warn.log文件中,会根据文件大小滚动的生成到目录info,error,warn目录下。所以我们只需要监听info,error,warn目录就行
具体实现,请查看https://gitee.com/lvmoney/zhy-frame-parent