最近要实现一个基于kafka的流数据处理,需要一个本地的kafka开发环境,有一些在线的测试数据。
项目结构:
[root@localhost KafkaProducer]# ls
config.ini docker-compose.yml Dockerfile main.py
main.py
import os
import time
import random
import json
import threading
from kafka import KafkaProducer
from configparser import ConfigParser
def read_data(directory):
"""从指定目录读取数据"""
files = [os.path.join(directory, f) for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]
data_contents = [open(file, 'r').read() for file in files]
if not data_contents:
raise ValueError(f"未在 {directory} 中找到数据")
return data_contents
def broadcast_data(producer, topic, data, rate):
"""广播数据到指定主题"""
while True:
message = random.choice(data)
producer.send(topic, value=message)
time.sleep(1 / rate)
# 读取配置文件
config = ConfigParser()
config.read('config.ini')
# Kafka配置
kafka_server = os.environ.get('KAFKA_BROKER_LIST', config.get('kafka', 'server'))
# 连接到Kafka
producer = KafkaProducer(bootstrap_servers=[kafka_server],
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
# 为每个主题创建一个线程
threads = []
for section in config.sections():
if section != 'kafka':
data_directory = config.get(section, 'data_directory')
write_rate = config.getfloat(section, 'write_rate')
data = read_data(data_directory)
thread = threading.Thread(target=broadcast_data, args=(producer, section, data, write_rate))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
config.ini
[kafka]
server = localhost:9092
[topic1]
data_directory = /data/topic1
write_rate = 1.0
[topic2]
data_directory = /data/topic2
write_rate = 2.0
Dockerfile
FROM python:3.8
# 安装kafka-python
RUN pip install kafka-python configparser
# 将脚本添加到容器
COPY main.py /main.py
COPY config.ini /config.ini
# 运行脚本
CMD ["python", "/main.py"]
docker-compose.yml
version: '3'
services:
kafka-producer:
image: harbor.xiaocaicai.com/public/kafka-producer
build: .
volumes:
- /home/volume/kafka-producer/config.ini:/config.ini
- /home/volume/kafka-producer/data:/data
编译好镜像直接运行即可,配置指定好测试数据位置和kafka位置。