Press "Enter" to skip to content

python实现一个kafka随机数据生成服务,容器部署

最近要实现一个基于kafka的流数据处理,需要一个本地的kafka开发环境,有一些在线的测试数据。

项目结构:

[root@localhost KafkaProducer]# ls
config.ini  docker-compose.yml  Dockerfile  main.py

main.py

import os
import time
import random
import json
import threading
from kafka import KafkaProducer
from configparser import ConfigParser

def read_data(directory):
    """从指定目录读取数据"""
    files = [os.path.join(directory, f) for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]
    data_contents = [open(file, 'r').read() for file in files]
    if not data_contents:
        raise ValueError(f"未在 {directory} 中找到数据")
    return data_contents

def broadcast_data(producer, topic, data, rate):
    """广播数据到指定主题"""
    while True:
        message = random.choice(data)
        producer.send(topic, value=message)
        time.sleep(1 / rate)

# 读取配置文件
config = ConfigParser()
config.read('config.ini')

# Kafka配置
kafka_server = os.environ.get('KAFKA_BROKER_LIST', config.get('kafka', 'server'))

# 连接到Kafka
producer = KafkaProducer(bootstrap_servers=[kafka_server],
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

# 为每个主题创建一个线程
threads = []
for section in config.sections():
    if section != 'kafka':
        data_directory = config.get(section, 'data_directory')
        write_rate = config.getfloat(section, 'write_rate')
        data = read_data(data_directory)
        thread = threading.Thread(target=broadcast_data, args=(producer, section, data, write_rate))
        threads.append(thread)
        thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()

config.ini

[kafka]
server = localhost:9092

[topic1]
data_directory = /data/topic1
write_rate = 1.0

[topic2]
data_directory = /data/topic2
write_rate = 2.0

Dockerfile

FROM python:3.8

# 安装kafka-python
RUN pip install kafka-python configparser

# 将脚本添加到容器
COPY main.py /main.py
COPY config.ini /config.ini

# 运行脚本
CMD ["python", "/main.py"]

docker-compose.yml

version: '3'
services:
  kafka-producer:
    image: harbor.xiaocaicai.com/public/kafka-producer
    build: .
    volumes:
      - /home/volume/kafka-producer/config.ini:/config.ini
      - /home/volume/kafka-producer/data:/data

编译好镜像直接运行即可,配置指定好测试数据位置和kafka位置。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注