Spark运行Python脚本写Kafka样例

先撇开spark,咱们先看下在python里怎么往kafka里写数据,这里用的是一个包是kafka-python,和java版的kafka-client一样,它是python版的kafka客户端,官方地址是:kafka-python

环境准备

  • 安装kafka-python包(方式一)
pip install kafka-python // 前提是机器上你已经安装了pip
tar xzvf kafka-python-1.4.3.tar.gz // 解压
cd kafka-python-1.4.3 // 切换到包目录
python setup.py install // 安装

使用方式

  • KafkaProducer
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['localhost:port'])

# Asynchronous by default
future = producer.send('my_topic', b'raw_bytes')

# Block for 'synchronous' sends
try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    # Decide what to do if produce request failed...
    log.exception()
    pass

# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
  • KafkaConsumer
from kafka import KafkaConsumer

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
                         group_id='my-group',
                         bootstrap_servers=['localhost:port'])
for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

接着我们看下在spark中,如何运行python脚本,并把数据写入kafka

Local 模式

  • 环境准备
// 参见上面,保证spark-client使用的python环境里安装上kafka-python包即可
  • 提交命令
spark/bin/spark-submit \
--master local[4] \
demo.py

所以在本地模式中,我们只要保证本地用到的python环境里面安装上kafka-python就行了,提交命令上没有啥改变,是不是非常简单

Standalone or Yarn 模式

  • 环境准备
tar xzvf kafka-python-1.4.3.tar.gz // 解压
cd kafka-python-1.4.3 // 切换到包目录
zip -r kafka.zip ./kafka // 把kafka这个源码包打包成zip
  • 提交命令
spark/bin/spark-submit \
--master spark://192.168.1.1:8888 \
--driver-memory 1G --executor-cores 4 --driver-cores 1 --total-executor-cores 120 --executor-memory 4G \
--py-files /home/work/kafka.zip \
demo.py

在集群模式中,因为最终任务会被提交到集群的机器上执行,依赖的是集群机器上的python环境,所以仅仅本地安装kafka-python包是没有作用的,必须通过--py-files参数把本地依赖的包带上去

补充说明:spark在运行python脚本时,常常需要带上一些依赖包,这里简单说明一下

依赖单个文件

  • 使用sc.addPyFiles(path)函数,代码里实现文件添加到SparkContext中
sc = SparkContext()
sc.addPyFile(file_path)
  • 命令中使用--py-files参数指定文件路径
spark/bin/spark-submit \
--master spark://192.168.1.1:8888 \
--py-files dependency.py \
demo.py

依赖自己写的其他模块

将自己写的模块打成一个zip包,然后当做单个文件使用。假设你有一个自己写的包叫util,里面有两个文件:init.py和util.py,然后你的demo.py里需要from util import util,你就可以直接打成util.zip包使用

zip -r util.zip ./util // 打包成zip
  • 同样使用sc.addPyFiles(path)函数,代码里实现文件添加到SparkContext中
sc = SparkContext()
sc.addPyFile(file_path)
  • 或者是命令中使用--py-files参数指定文件路径
spark/bin/spark-submit \
--master spark://192.168.1.1:8888 \
--py-files util.zip \
demo.py

依赖三方模块

像上文中说的kafka-python就是一个三方的包,但是这个包提供了源码,这个时候就可以直接把源码打包成zip,然后像调用自己写的模块包一样引用进去

  • 能下载到依赖的tar包
    直接找到源码,然后打包使用
tar xzvf kafka-python-1.4.3.tar.gz // 解压
cd kafka-python-1.4.3 // 切换到包目录
zip -r kafka.zip ./kafka // 把kafka这个源码包打包成zip
  • 其他方式安装
    这种一般看不到源码包,譬如执行pip install numpy。我们知道,第三方的包一般会被安装到$python_path/lib/python2/site-packages目录下面,这时就可以到这个目录下,把需要的包打包成zip。然后通过上面说的方式使用

注意:--py-files参数是可以加入多个文件的,多个文件之间以逗号分隔即可

spark/bin/spark-submit \
--master spark://192.168.1.1:8888 \
--py-files dependency1.zip,dependency2.zip,dependency.py \
demo.py

推荐阅读更多精彩内容