Bootstrap

k8s 上运行我们的 springboot 服务之——flume同步数据到到clickHouse

clickHouse的简单介绍,详细介绍请查看官网或者百度

1)clickhouse非hadoop体系

2)使用sql语句,对于熟悉关系数据的人员入门相对简单

3)clickhouse最好用来读,不要用来变更,写用批量的方式

4)各种日志数据我们可以用flume同步到clickhouse来统一管理和做用户行为分析

5)mysql 增量同步到clickhouse,这里有一个思考:系统日志,交易日志,用户行为日志,已生成订单等不变的数据似乎可以同步到clickhouse来做报表、统计、数据分析等。

由于用户经常查询和操作一般都是最近的或者最新的数据,可以把这部分变更的有事务要求的数据放到mysql中。把mysql数据同步到clickhouse,近期最新和变更的数据在mysql中操作,其他大部分数据在clickhouse中操作,这样来减轻关系型数据库的性能瓶颈。

6)面对错综复杂的数据源我们似乎可以使用flink来把数据统一归集到clickhouse

以上都需要根据实际情况去测试使用。毕竟实践是检验真理的唯一标准

flume的简单介绍,详细介绍请查看官网或者百度

1)高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统

2)支持监听多种方式多种类型的文件或者文件目录数据的变更,以获得变更的数据并把这部分数据推送到不同的数据接收中间件

3)flume提供了多种插件来完成2中的需求,常用的例如:监听TCP的端口做为数据源,监听目录下日志文件的变更等

4)flume可以把变更的数据同步队列中,然后队列把数据分发到我们各种数据仓库中间件中,也可以不通过队列直接把数据同步存储到数据仓库中间件中

5)我们也可以自定义flume的ng来满足我们自己特殊的数据变更同步需求

本文主要讲解内容如下:

1、自定义实现flume ng

2、测试我们系统生成的日志通过1中实现的flume ng同步到 clickhouse

下面是核心代码:

1)在clickhouse中创建表:

CREATE TABLE default.sys_log (
 `id` String,
 `sys_name` String,
 `level` String,
 `msg` String,
 `thread` String,
 `create_date` DateTime,
 `exe_date` DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(create_date)
ORDER BY exe_date;

2)自定义flume sink pom.xml

  
        
        
            org.apache.flume
            flume-ng-core
        
        
            com.alibaba
            fastjson
        
        
            com.google.guava
            guava
        

        
        
            org.apache.flume
            flume-ng-configuration
        

        
        
            org.apache.flume
            flume-ng-sdk
        

        
            com.opencsv
            opencsv
            4.2
        

        
            ru.yandex.clickhouse
            clickhouse-jdbc
            0.2.4
        

        
            org.projectlombok
            lombok
        

    

3) flume 自定义sink 核心类

 private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSink.class);
    private BalancedClickhouseDataSource dataSource = null;
    private SinkCounter sinkCounter = null;
    private String host = null;
    private String port = null;
    private String user = null;
    private String password = null;
    private String database = null;
    private String table = null;
    private int batchSize;

    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;
        Channel ch = getChannel();
        Transaction txn = ch.getTransaction();
        txn.begin();
        List insertData = new ArrayList<>();
        try {
            ClickHouseConnectionImpl conn = (ClickHouseConnectionImpl) dataSource.getConnection();
            int count;
            for (count = 0; count < batchSize; ++count) {
                Event event = ch.take();
                if (event == null) {
                    break;
                }
                insertData.add(StringUtil.buildLog(new String(event.getBody())));
            }
            if (count <= 0) {
                sinkCounter.incrementBatchEmptyCount();
                txn.commit();
                return Status.BACKOFF;
            } else if (count < batchSize) {
                sinkCounter.incrementBatchUnderflowCount();
            } else {
                sinkCounter.incrementBatchCompleteCount();
            }
            sinkCounter.addToEventDrainAttemptCount(count);
            ClickHouseStatement sth = conn.createStatement();
            sth.write().table(String.format(" %s.%s", database, table)).data(new ByteArrayInputStream(JsonUtil.t2JsonString(insertData).getBytes()), ClickHouseFormat.JSONEachRow).addDbParam(ClickHouseQueryParam.MAX_PARALLEL_REPLICAS, MAX_PARALLEL_REPLICAS_VALUE).send();
            sinkCounter.incrementEventDrainSuccessCount();
            status = Status.READY;
            txn.commit();

        } catch (Throwable t) {
            txn.rollback();
            LOGGER.error(t.getMessage(), t);
            status = Status.BACKOFF;
            // re-throw all Errors
            if (t instanceof Error) {
                throw (Error) t;
            }
        } finally {
            txn.close();
        }
        return status;
    }

    @Override
    public void configure(Context context) {
        if (sinkCounter == null) {
            sinkCounter = new SinkCounter(getName());
        }

        Preconditions.checkArgument(context.getString(HOST) != null && context.getString(HOST).length() > 0, "ClickHouse host must be specified!");
        this.host = context.getString(HOST);
        if (!this.host.startsWith(CLICK_HOUSE_PREFIX)) {
            this.host = CLICK_HOUSE_PREFIX + this.host;
        }

        Preconditions.checkArgument(context.getString(DATABASE) != null && context.getString(DATABASE).length() > 0, "ClickHouse database must be specified!");
        this.database = context.getString(DATABASE);
        Preconditions.checkArgument(context.getString(TABLE) != null && context.getString(TABLE).length() > 0, "ClickHouse table must be specified!");
        this.table = context.getString(TABLE);
        this.port = context.getString(PORT, DEFAULT_PORT);
        this.user = context.getString(USER, DEFAULT_USER);
        this.password = context.getString(PASSWORD, DEFAULT_PASSWORD);
        this.batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
    }


    @Override
    public void start() {
        LOGGER.info("clickHouse sink {} starting", getName());
        String jdbcUrl = String.format("%s:%s/%s", this.host, this.port, this.database);
        ClickHouseProperties properties = new ClickHouseProperties().withCredentials(this.user, this.password);
        this.dataSource = new BalancedClickhouseDataSource(jdbcUrl, properties);
        sinkCounter.start();
        super.start();
        LOGGER.info("clickHouse sink {} started", getName());
    }


    @Override
    public void stop() {
        LOGGER.info("clickHouse sink {} stopping", getName());
        sinkCounter.incrementConnectionClosedCount();
        sinkCounter.stop();
        super.stop();
        LOGGER.info("clickHouse sink {} stopped", getName());
    }

4)flume 对应配置:

# 指定Agent的组件名称  
a1.sources = r1  
a1.sinks = sink1  
a1.channels = c1  

a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/home/spark/flume/data/log
a1.sources.r1.channels=c1
a1.sources.r1.fileHeader = false
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
   
# 指定Flume sink  
a1.sinks.sink1.type = com.zhy.frame.newsql.clickhouse.sink.sink.ClickHouseSink
a1.sinks.sink1.host = localhost
a1.sinks.sink1.port = 8123
a1.sinks.sink1.database = default
a1.sinks.sink1.table = sys_log
a1.sinks.sink1.batchSize = 10000
a1.sinks.sink1.user = default
a1.sinks.sink1.password =  
   
# 指定Flume channel  
a1.channels.c1.type = memory  
a1.channels.c1.capacity = 1000  
a1.channels.c1.transactionCapacity = 100  
   
# 绑定source和sink到channel上  
a1.sources.r1.channels = c1  
a1.sinks.sink1.channel = c1

5)系统logback配置







    logback

    
    
    
    
    
    

    

    
    
    
    
    
    
    

    
    
        
        
            info
        
        
            ${CONSOLE_LOG_PATTERN}
            
            UTF-8
        
    


    
    
    
        
        ${logging.path}/debug_${sysName}.log
        
        
            
            
                
                    
                        {
                        "sysName":"${sysName}",
                        "thread":"%thread",
                        "exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}",
                        "level":"%level",

                        "msg": "%msg"
                        }
                    
                
            
        
        
        
            
            ${logging.path}/debug/debug_${sysName}_%d{yyyy-MM-dd}.%i.log
            
                ${logMaxFileSize}
            
            
            ${logMaxHistory}
        
        
        
            DEBUG
            ACCEPT
            DENY
        
    

    
    
        
        ${logging.path}/info_${sysName}.log
        
            
            
                
                    
                        {
                        "sysName":"${sysName}",
                        "thread":"%thread",
                        "exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}",
                        "level":"%level",

                        "msg": "%msg"
                        }
                    
                
            
        
        
        
            
            ${logging.path}/info/info_${sysName}_%d{yyyy-MM-dd}.%i.log
            
                ${logMaxFileSize}
            
            
            ${logMaxHistory}
        
        
        
            INFO
            ACCEPT
            DENY
        
    

    
    
        
        ${logging.path}/warn_${sysName}.log
        
        
            
            
                
                    
                        {
                        "sysName":"${sysName}",
                        "thread":"%thread",
                        "exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}",
                        "level":"%level",

                        "msg": "%msg"
                        }
                    
                
            
        
        
        
            ${logging.path}/warn/warn_${sysName}_%d{yyyy-MM-dd}.%i.log
            
                ${logMaxFileSize}
            
            
            ${logMaxHistory}
        
        
        
            WARN
            ACCEPT
            DENY
        
    


    
    
        
        ${logging.path}/error_${sysName}.log
        
        
            
            
                
                    
                        {
                        "sysName":"${sysName}",
                        "thread":"%thread",
                        "exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}",
                        "level":"%level",

                        "msg": "%msg"
                        }
                    
                
            
        
        
        
            ${logging.path}/error/error_${sysName}_%d{yyyy-MM-dd}.%i.log
            
                ${logMaxFileSize}
            
            
            ${logMaxHistory}
        
        
        
            ERROR
            ACCEPT
            DENY
        
    
    
        
        
        
        
        
    

综上,系统日志首先会在info.log,error.log,warn.log文件中,会根据文件大小滚动的生成到目录info,error,warn目录下。所以我们只需要监听info,error,warn目录就行

具体实现,请查看https://gitee.com/lvmoney/zhy-frame-parent