python3操作kafka sasl_ssl客户端程序
最近由于需要测试一些数据,而 kafka 的服务器是带有 SSL 加密和 SASL 认证的,所以写了个小的程序。简化后的程序放这里。
由于在加解密数据时的,需要使用 jks 证书文件,这个 jks 只有 java 可以读取,所以需要从 jks 中提取根证书,命令参数介绍如下。
pass123 为 jks 证书文件密码, 若提示
keytool命令找不到,请安装jdk,jdk包中包含了这个命令。
1keytool -exportcert -alias caroot -keystore kafka.client.keystore.jks -rfc -file certificate.pem -storepass pass123
2keytool -v -importkeystore -srckeystore kafka.client.keystore.jks -srcalias caroot -destkeystore cert_and_key.p12 -deststoretype PKCS12 -storepass pass123 -srcstorepass pass123
3openssl pkcs12 -in cert_and_key.p12 -nodes -nocerts -out key.pem -passin pass:pass123
4keytool -exportcert -alias caroot -keystore kafka.client.keystore.jks -rfc -file CARoot.pem -storepass pass123
命令完成,除 kafka.client.keystore.jks 外,会生成以下文件:
1CARoot.pem
2cert_and_key.p12
3certificate.pem
4kafka.client.keystore.jks
5key.pem
消息生产者:
1import time
2from kafka import KafkaProducer
3import ssl
4
5kafka_server = 'kafka01.mylass.com:9093' # 多个节点,可以写成 list
6# kafka_server = ['172.19.169.169:9093','172.19.169.170:9093','172.19.169.171:9093']
7topic_name = 'ssl-test'
8
9ctx = ssl.SSLContext(ssl.PROTOCOL_TLS) # windows ssl.SSLError: [SSL: WRONG_VERSION_NUMBER] wrong version number
10ctx.set_ciphers('ALL:@SECLEVEL=0') # windows ssl.SSLError: [SSL: WRONG_VERSION_NUMBER] wrong version number
11
12producer = KafkaProducer(
13 bootstrap_servers = kafka_server,
14 acks = 'all', # ack 类型
15 security_protocol = 'SASL_SSL',
16 sasl_mechanism = 'SCRAM-SHA-512', # 取决于你的服务器用了什么参数,保持一致
17 ssl_check_hostname = False,
18 ssl_certfile = 'certificate.pem', # 经过测试,只配置ca证书即可,这个证书可以不配置
19 ssl_cafile = 'CARoot.pem', # 经过测试,只配置ca证书即可
20 ssl_context = ctx, # windows ssl.SSLError: [SSL: WRONG_VERSION_NUMBER] wrong version number
21 sasl_plain_username = 'username',
22 sasl_plain_password = 'password'
23)
24
25for i in range(5):
26 s0 = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
27 s1 = str(s0)+' messages: ' + str(i)
28 future1 = producer.send(topic_name, s1.encode('utf-8')) #发送消息
29 producer.flush()
30 record = future1.get(timeout=10)
31 print(record)
消息消费者:
1from kafka import KafkaConsumer
2import ssl
3
4kafka_server = 'kafka01.mylass.com:9093' # 多个节点,可以写成 list
5# kafka_server = ['172.19.169.169:9093','172.19.169.170:9093','172.19.169.171:9093']
6topic = 'ssl-test'
7groupid = 'sslgroup1'
8
9ctx = ssl.SSLContext(ssl.PROTOCOL_TLS) # windows ssl.SSLError: [SSL: WRONG_VERSION_NUMBER] wrong version number
10ctx.set_ciphers('ALL:@SECLEVEL=0') # windows ssl.SSLError: [SSL: WRONG_VERSION_NUMBER] wrong version number
11
12consumer = KafkaConsumer(
13 topic,
14 group_id = groupid,
15 bootstrap_servers = kafka_server,
16 security_protocol = 'SASL_SSL',
17 sasl_mechanism = 'SCRAM-SHA-512', # 取决于你的服务器用了什么参数,保持一致
18 sasl_plain_username = 'username',
19 sasl_plain_password = 'password',
20 ssl_check_hostname = False,
21 ssl_cafile = 'CARoot.pem',
22 ssl_certfile = 'certificate.pem',
23 ssl_context = ctx # windows ssl.SSLError: [SSL: WRONG_VERSION_NUMBER] wrong version number
24)
25for msg in consumer:
26 print("partition = %d, offset = %d" % (msg.partition, msg.offset)) # 元数据,可根据情况获取
27 print("value = %s" % msg.value.decode()) # 消息体