python3操作kafka sasl_ssl客户端程序

最近由于需要测试一些数据,而 kafka 的服务器是带有 SSL 加密和 SASL 认证的,所以写了个小的程序。简化后的程序放这里。

由于在加解密数据时的,需要使用 jks 证书文件,这个 jks 只有 java 可以读取,所以需要从 jks 中提取根证书,命令参数介绍如下。

pass123 为 jks 证书文件密码, 若提示 keytool 命令找不到,请安装 jdkjdk 包中包含了这个命令。

1keytool -exportcert -alias caroot -keystore kafka.client.keystore.jks -rfc -file certificate.pem -storepass pass123
2keytool -v -importkeystore -srckeystore kafka.client.keystore.jks -srcalias caroot -destkeystore cert_and_key.p12 -deststoretype PKCS12 -storepass pass123 -srcstorepass pass123
3openssl pkcs12 -in cert_and_key.p12 -nodes -nocerts -out key.pem -passin pass:pass123
4keytool -exportcert -alias caroot -keystore kafka.client.keystore.jks -rfc -file CARoot.pem -storepass pass123

命令完成,除 kafka.client.keystore.jks 外,会生成以下文件:

1CARoot.pem
2cert_and_key.p12
3certificate.pem
4kafka.client.keystore.jks
5key.pem

消息生产者:

 1import time
 2from kafka import KafkaProducer
 3import ssl
 4
 5kafka_server = 'kafka01.mylass.com:9093'  # 多个节点,可以写成 list
 6# kafka_server = ['172.19.169.169:9093','172.19.169.170:9093','172.19.169.171:9093']
 7topic_name = 'ssl-test'
 8
 9ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)  # windows ssl.SSLError: [SSL: WRONG_VERSION_NUMBER] wrong version number
10ctx.set_ciphers('ALL:@SECLEVEL=0')  # windows ssl.SSLError: [SSL: WRONG_VERSION_NUMBER] wrong version number
11
12producer = KafkaProducer(
13    bootstrap_servers = kafka_server,
14    acks = 'all',  # ack 类型
15    security_protocol = 'SASL_SSL',
16    sasl_mechanism = 'SCRAM-SHA-512',  # 取决于你的服务器用了什么参数,保持一致
17    ssl_check_hostname = False,
18    ssl_certfile = 'certificate.pem',  # 经过测试,只配置ca证书即可,这个证书可以不配置
19    ssl_cafile = 'CARoot.pem',  # 经过测试,只配置ca证书即可
20    ssl_context = ctx,  # windows ssl.SSLError: [SSL: WRONG_VERSION_NUMBER] wrong version number
21    sasl_plain_username = 'username',
22    sasl_plain_password = 'password'
23)
24
25for i in range(5):
26    s0 = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
27    s1 = str(s0)+' messages: ' + str(i)
28    future1 = producer.send(topic_name, s1.encode('utf-8'))  #发送消息
29    producer.flush()
30    record = future1.get(timeout=10)
31    print(record)

消息消费者:

 1from kafka import KafkaConsumer
 2import ssl
 3
 4kafka_server = 'kafka01.mylass.com:9093'  # 多个节点,可以写成 list
 5# kafka_server = ['172.19.169.169:9093','172.19.169.170:9093','172.19.169.171:9093']
 6topic = 'ssl-test'
 7groupid = 'sslgroup1'
 8
 9ctx  = ssl.SSLContext(ssl.PROTOCOL_TLS)  # windows ssl.SSLError: [SSL: WRONG_VERSION_NUMBER] wrong version number
10ctx.set_ciphers('ALL:@SECLEVEL=0')  # windows ssl.SSLError: [SSL: WRONG_VERSION_NUMBER] wrong version number
11
12consumer = KafkaConsumer(
13    topic,
14    group_id = groupid,
15    bootstrap_servers = kafka_server,
16    security_protocol = 'SASL_SSL',
17    sasl_mechanism = 'SCRAM-SHA-512',  # 取决于你的服务器用了什么参数,保持一致
18    sasl_plain_username = 'username',
19    sasl_plain_password = 'password',
20    ssl_check_hostname = False,
21    ssl_cafile = 'CARoot.pem',
22    ssl_certfile = 'certificate.pem',
23    ssl_context = ctx  # windows ssl.SSLError: [SSL: WRONG_VERSION_NUMBER] wrong version number
24)
25for msg in consumer:
26    print("partition = %d, offset = %d" % (msg.partition, msg.offset))  # 元数据,可根据情况获取
27    print("value = %s" % msg.value.decode()) # 消息体