Bootstrap

k8s 上运行我们的 springboot 服务之——flume 读取kafka数据批量同步到clickhouse

业务中台是什么?

业务中台简单来讲,就是企业级功能复用平台,比如:淘宝下面有很多电商产品有toB、有toC其实他们用到的账号系统、交易系统、营销系统等,这些大模块都是通用的。如果每个团队都重新开发一套系统就是对资源的严重浪费。因此,有专门的团队负责开发这些通用的系统,再赋能给每个产品线,这样既做到资源的最大化重复利用,又可以将每条产品线的数据沉淀在一起。

我的整个k8s上运行我们的springboot也是基于这个目的。

数据中台是什么?

同样的如果每条产品线都配备数据分析、开发相关人员又是一种资源的浪费。

数据中台要做四个方面的工作分别是“采集”、“存储”、“打通”、“使用”。采集就是要采集各条业务线的业务数据、日志数据、用户行为数据等有用的数据。

存储就是要用更加科学的方式存储数据,一般采用三层建模的方式,让收集上来的数据形成公司的数据资产。打通就是要打通用户的行为数据和用户的业务数据,如电商用户的浏览、点击行为和用户的支付业务数据,就要做到打通。使用就是就打通的数据赋能业务人员、领导层进行决策,做到数据反哺业务。

大数据是什么?

是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产

三者关系简要说明

业务中台(系统)收集各种数据形成大数据,数据中台把业务中台收集的数据按照不同指标的标准归类分析转化存储到数据仓库并根据中台业务指标抽象成数据统一的数据接口。业务中台调用数据中台提供的数据分析接口为我们中台的服务服务。

我们这里主要讲把各个数据源的数据同步到kafka后用flume批量的把kafka的数据同步到clickhouse

自定义flume的sink主要用来批量保存数据到clickhouse,具体代码如下:

public class ClickHouseKafkaSink extends AbstractSink implements Configurable {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseKafkaSink.class);
    private BalancedClickhouseDataSource dataSource = null;
    private SinkCounter sinkCounter = null;
    private String host = null;
    private String port = null;
    private String user = null;
    private String password = null;
    private String database = null;
    private String table = null;
    private int batchSize;
    private final DateTimeFormatter df = DateTimeFormatter.ofPattern(ClickHouseSinkConstant.DATE_FORMAT);

    private int intervalDate;
    Cache cache = Caffeine.newBuilder()
            .maximumSize(DEFAULT_CACHE_MAX_SIZE)
            .build();
    SnowflakeIdFactoryUtil idWorker = new SnowflakeIdFactoryUtil(1, 2);

    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;
        Channel ch = getChannel();
        Transaction txn = ch.getTransaction();
        // 声明事件
        Event event;
        txn.begin();
        try {
            handData();
            int count;
            for (count = 0; count < batchSize; ++count) {
                event = ch.take();
                if (event == null) {
                    break;
                }
                Long num = idWorker.nextId();
                String data = new String(event.getBody());
                cache.put(num.toString(), data);
                LOGGER.info(data);
            }
            if (count < batchSize) {
                sinkCounter.incrementBatchUnderflowCount();
            } else {
                sinkCounter.incrementBatchCompleteCount();
            }
            sinkCounter.addToEventDrainAttemptCount(count);
            handData();
            sinkCounter.incrementEventDrainSuccessCount();
            status = Status.READY;
            txn.commit();

        } catch (Throwable t) {
            txn.rollback();
            LOGGER.error(t.getMessage(), t);
            status = Status.BACKOFF;
            // re-throw all Errors
            if (t instanceof Error) {
                throw (Error) t;
            }
        } finally {
            txn.close();
        }
        return status;
    }

    @Override
    public void configure(Context context) {
        if (sinkCounter == null) {
            sinkCounter = new SinkCounter(getName());
        }
        this.intervalDate = context.getInteger(INTERVAL_DATE, DEFAULT_INTERVAL_DATE);
        Preconditions.checkArgument(context.getString(HOST) != null && context.getString(HOST).length() > 0, "ClickHouse host must be specified!");
        this.host = context.getString(HOST);
        if (!this.host.startsWith(CLICK_HOUSE_PREFIX)) {
            this.host = CLICK_HOUSE_PREFIX + this.host;
        }

        Preconditions.checkArgument(context.getString(DATABASE) != null && context.getString(DATABASE).length() > 0, "ClickHouse database must be specified!");
        this.database = context.getString(DATABASE);
        Preconditions.checkArgument(context.getString(TABLE) != null && context.getString(TABLE).length() > 0, "ClickHouse table must be specified!");
        this.table = context.getString(TABLE);
        this.port = context.getString(PORT, DEFAULT_PORT);
        this.user = context.getString(USER, DEFAULT_USER);
        this.password = context.getString(PASSWORD, DEFAULT_PASSWORD);
        this.batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_MAX_SIZE);
    }


    @Override
    public void start() {
        LOGGER.info("clickHouse sink {} starting", getName());
        String jdbcUrl = String.format("%s:%s/%s", this.host, this.port, this.database);
        ClickHouseProperties properties = new ClickHouseProperties().withCredentials(this.user, this.password);
        this.dataSource = new BalancedClickhouseDataSource(jdbcUrl, properties);
        sinkCounter.start();
        super.start();
        LOGGER.info("clickHouse sink {} started", getName());
    }


    @Override
    public void stop() {
        LOGGER.info("clickHouse sink {} stopping", getName());
        sinkCounter.incrementConnectionClosedCount();
        sinkCounter.stop();
        data2Db();
        super.stop();
        LOGGER.info("clickHouse sink {} stopped", getName());
    }

    /**
     * 处理数据的逻辑处理
     * 1:当缓存的size大于 配置的batchSize,同步缓存数据
     * 2:最新更新时间比现在超过DEFAULT_INTERVAL_DATE(默认8秒),同步缓存数据
     *
     * @throws
     * @return: void
     * @author: lvmoney /XXXXXX科技有限公司
     * @date: 2020/7/22 9:02
     */
    private void handData() {
        if (cache.asMap().size() >= batchSize) {
            data2Db();
        } else {
            String lastDate = (String) cache.get(DEFAULT_CACHE_DATE_KAFKA, k -> LocalDateTime.now().format(df));
            String now = LocalDateTime.now().format(df);
            Long interval = ChronoUnit.MILLIS.between(
                    LocalDateTime.parse(lastDate, df).atZone(
                            ZoneId.of(DEFAULT_ZONE_ID)).toInstant(),
                    LocalDateTime.parse(now, df).atZone(
                            ZoneId.of(DEFAULT_ZONE_ID)).toInstant());
            if (interval >= intervalDate) {
                data2Db();
            }
        }
    }

    /**
     * 同步数据并且更新同步时间
     *
     * @throws
     * @return: void
     * @author: lvmoney /XXXXXX科技有限公司
     * @date: 2020/7/22 9:04
     */
    private void data2Db() {
        List insertData = new ArrayList<>();
        cache.asMap().forEach((k, v) -> {
            if (!k.equals(DEFAULT_CACHE_DATE_KAFKA)) {
                insertData.add(StringUtil.buildKafka(JsonUtil.t2JsonString(v)));
            }
        });
        if (insertData == null || insertData.size() == 0) {
            return;
        }
        ClickHouseConnectionImpl conn = null;
        try {
            conn = (ClickHouseConnectionImpl) dataSource.getConnection();
            ClickHouseStatement sth = conn.createStatement();
            LOGGER.info("data:=" + JsonUtil.t2JsonString(insertData));
            sth.write().table(String.format(" %s.%s", database, table)).data(new ByteArrayInputStream(JsonUtil.t2JsonString(insertData).getBytes()), ClickHouseFormat.JSONEachRow).addDbParam(ClickHouseQueryParam.MAX_PARALLEL_REPLICAS, MAX_PARALLEL_REPLICAS_VALUE).send();
        } catch (SQLException e) {
            LOGGER.error("同步数据到clickhouse报错:{}", e);
        } finally {
            try {
                if (conn != null) {
                    conn.close();
                }
            } catch (SQLException e) {
                LOGGER.error("关闭clickhouse Connection报错:{}", e);
            }
        }

        cache.put(DEFAULT_CACHE_DATE_KAFKA, LocalDateTime.now().format(df));
        cache.invalidateAll();
    }

}

flume 配置:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source


a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 4
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 10.20.128.235:9092
a1.sources.r1.kafka.topics = test

# Describe the sink
a1.sinks.k1.type = com.zhy.frame.newsql.clickhouse.sink.sink.ClickHouseKafkaSink
#当数据list达到8开始同步
a1.sinks.k1.batchSize = 8
a1.sinks.k1.host = localhost
a1.sinks.k1.port = 8123
a1.sinks.k1.database = default
a1.sinks.k1.table = sys_log
#数据同步间隔时间
al.sinks.k1.intervalDate = 8000

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

kafka里面的数据需要是json格式并且字段和clickhouse里面字段一致。为了达到批量保存的效果,我们使用了缓存来存放数据,在数据量或者时间间隔的阀值达到后批量同步到clickhouse