Press "Enter" to skip to content

Flink的一些实践示例

package com.xiaocaicai.fs.flink;

import com.xiaocaicai.fs.flink.utils.GSON;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Calendar;
import java.util.Properties;
import java.util.TimeZone;

public class KafkaFlinkIntegration {

    public static void main(String[] args) throws Exception {
        // 设置 Kafka 连接属性
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.1.200:9092");
        properties.setProperty("group.id", "devour-flink");

        // 创建 Flink 流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 创建 Kafka Source
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
                "apiLog",
                new SimpleStringSchema(),
                properties);

        // 数据流处理
        DataStream<LogRecord> stream = env
                .addSource(kafkaSource)
                .map(new MapFunction<String, LogRecord>() { // 解析 JSON
                    @Override
                    public LogRecord map(String value) throws Exception {
                        return parseJson(value);
                    }
                })
                .filter(new FilterFunction<LogRecord>() { // 过滤条件
                    @Override
                    public boolean filter(LogRecord record) throws Exception {
                        if(record == null){
                            return false;
                        }
                        return "production".equals(record.env) && record.providerSide;
                    }
                });

        stream.keyBy(record -> record.interfaceId + record.methodName) // KeyBy 操作
                .window(SlidingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(5))) // 窗口大小设置为一分钟
                .sum("count") // 聚合函数计数
                .addSink(new SinkFunction<LogRecord>() {
                    @Override
                    public void invoke(LogRecord value, Context context) throws Exception {
                        SinkFunction.super.invoke(value, context);
                        System.out.println(value.appName + " ----> " + value.interfaceId + " ----> " + value.count);
                    }
                }); // 输出结果

        stream.keyBy(record -> record.appName) // KeyBy 操作
                .window(SlidingProcessingTimeWindows.of(Time.minutes(1), Time.minutes(1)))
                .aggregate(
                        new AggregateFunction<LogRecord, Tuple2<Long, Integer>, Tuple2<Double, Integer>>() {
                            @Override
                            public Tuple2<Long, Integer> createAccumulator() {
                                return new Tuple2<>(0L, 0);
                            }

                            @Override
                            public Tuple2<Long, Integer> add(LogRecord value, Tuple2<Long, Integer> accumulator) {
                                return new Tuple2<>(accumulator.f0 + value.elapsedTime, accumulator.f1 + 1);
                            }

                            @Override
                            public Tuple2<Double, Integer> getResult(Tuple2<Long, Integer> accumulator) {
                                return new Tuple2<>(accumulator.f1 == 0 ? 0.0 : (double) accumulator.f0 / accumulator.f1, accumulator.f1);
                            }

                            @Override
                            public Tuple2<Long, Integer> merge(Tuple2<Long, Integer> a, Tuple2<Long, Integer> b) {
                                return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
                            }
                        },
                        new ProcessWindowFunction<Tuple2<Double, Integer>, String, String, TimeWindow>() {
                            @Override
                            public void process(String key, Context context, Iterable<Tuple2<Double, Integer>> averages, Collector<String> out) {
                                Tuple2<Double, Integer> average = averages.iterator().next();
                                out.collect("AppName: " + key + " Count: " + average.f1 +  " Average: " + average.f0);
                            }
                        }
                )
                .addSink(new SinkFunction<String>() {
                    @Override
                    public void invoke(String value, Context context) throws Exception {
                        SinkFunction.super.invoke(value, context);
                        System.out.println("应用统计及延迟:" + value);
                    }
                });


        stream
                .windowAll(SlidingProcessingTimeWindows.of(Time.minutes(1), Time.minutes(1)))
                .apply(new AllWindowFunction<LogRecord, String, TimeWindow>(){
                    private long count = 0;
                    @Override
                    public void apply(TimeWindow window, Iterable<LogRecord> values, Collector<String> out) throws Exception {
                        if (isMidnight(window.getEnd())) {
                            count = 0;
                            // 每天0点重置计数器
                        }
                        for (LogRecord value : values) {
                            count++;
                        }

                        out.collect("ALL DAY COUNT: " + count);
                    }
                    private boolean isMidnight(long timestamp) {
                        // 创建日历实例
                        Calendar calendar = Calendar.getInstance();
                        // 设置时区,这里假设使用系统默认时区,根据需要调整
                        calendar.setTimeZone(TimeZone.getDefault());
                        // 将时间戳设置到日历实例中
                        calendar.setTimeInMillis(timestamp);

                        // 获取小时和分钟
                        int hour = calendar.get(Calendar.HOUR_OF_DAY);
                        int minute = calendar.get(Calendar.MINUTE);
                        return hour == 0 && minute == 0;
                    }
                })
                .addSink(new SinkFunction<String>() {
                    @Override
                    public void invoke(String value, Context context) throws Exception {
                        SinkFunction.super.invoke(value, context);
                        System.out.println("全量统计:" + value);
                    }
                });

        // 启动 Flink 作业
        env.execute("Kafka Flink Integration");
    }

    private static LogRecord parseJson(String json) {
        // 实现 JSON 解析逻辑,将 JSON 字符串转换为 LogRecord 对象
        // 例如,使用 Jackson 或其他 JSON 解析库
        try {
            return GSON.fromJson(json, LogRecord.class);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

}

用例介绍:

从Kafka消费的一些接口日志,先对数据进行了过滤,然后将数据进行了3个维度的分析,分别是:

1.按照接口的维度统计调用量

2.按照应用的维度统计调用量和平均延迟

3.按照自然日来统计调用量

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注