Press "Enter" to skip to content

Flink消费kafka,对同一个数据源进行多维度的实时分析处理

我的诉求是,我现在kafka上面有个实时是数据流,他是一个接口的请求日志。

我现在想要实时分析出来每个接口的访问频次和延迟,我还想按照系统的维度统计访问频次和延迟。

就是说我想对同一个数据源进行初始过滤,然后对数据流进行分支,然后分别计算我想要的数据。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
    "logdata",
    new SimpleStringSchema(),
    properties);

DataStream<String> stream = env.addSource(consumer);

DataStream<YourRecordType> parsedStream = stream
    .map(new YourJsonDeserializer());

DataStream<YourRecordType> filteredStream = parsedStream
    .filter(record -> "production".equals(record.getEnv()) && record.isProviderSide());
// 统一过滤

// 分支一:按 interfaceId + methodName 统计
filteredStream
    .keyBy(record -> record.getInterfaceId() + record.getMethodName())
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new YourCountAggregateFunction())
    .addSink(new YourOutputSink());

// 分支二:按 appName 统计
filteredStream
    .keyBy(record -> record.getAppName())
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new YourAppAggregateFunction())
    .addSink(new YourOutputSink());

env.execute("Kafka Flink Integration");
发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注