python3客户端连接ssl kafka版本号错误

由于安全要求,在使用外网的情况下,使用kafka必须使用加密,但在使用python客户端,出现了问题,使用java则无问题。

在使用python客户端的时候,由于不能直接使用 jks 证书文件(此为java专用)

报信信息如下:

 1Traceback (most recent call last):
 2  File "m:/mm/kfk/ssl_producer.py", line 31, in <module>
 3    sasl_plain_password='pangoo.redis@sync1'
 4  File "M:\mm\lib\site-packages\kafka\producer\kafka.py", line 383, in __init__
 5    **self.config)
 6  File "M:\mm\lib\site-packages\kafka\client_async.py", line 244, in __init__
 7    self.config['api_version'] = self.check_version(timeout=check_timeout)
 8  File "M:\mm\lib\site-packages\kafka\client_async.py", line 909, in check_version
 9    version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
10  File "M:\mm\lib\site-packages\kafka\conn.py", line 1238, in check_version
11    if not self.connect_blocking(timeout_at - time.time()):
12  File "M:\mm\lib\site-packages\kafka\conn.py", line 340, in connect_blocking
13    self.connect()
14  File "M:\mm\lib\site-packages\kafka\conn.py", line 429, in connect
15    if self._try_handshake():
16  File "M:\mm\lib\site-packages\kafka\conn.py", line 508, in _try_handshake
17    self._sock.do_handshake()
18  File "C:\Users\ainy\AppData\Local\Programs\Python\Python37\lib\ssl.py", line 1139, in do_handshake
19    self._sslobj.do_handshake()
20ssl.SSLError: [SSL: WRONG_VERSION_NUMBER] wrong version number (_ssl.c:1091)

一开始以为使用的证书有问题,最后搜索后解决,windows需要指定一下版本号。

允许不安全的密码,初始化 ssl.SSLContext,并将 ctx 作为 ssl_context 参数传递给 KafkaConsumerKafkaProducer 构造函数

1ctx  = ssl.SSLContext(ssl.PROTOCOL_TLS)
2ctx.set_ciphers('ALL:@SECLEVEL=0')

传递给kafka,示例

 1from kafka import KafkaProducer
 2import ssl
 3
 4ctx  = ssl.SSLContext(ssl.PROTOCOL_TLS)
 5ctx.set_ciphers('ALL:@SECLEVEL=0')
 6
 7producer = KafkaProducer(
 8    bootstrap_servers = 'localhost:9093',
 9    security_protocol = 'SASL_SSL',
10    sasl_mechanism = 'SCRAM-SHA-512',
11    ssl_check_hostname = False,
12    ssl_certfile = 'certificate.pem',
13    ssl_cafile = 'CARoot.pem',
14    ssl_context = ctx,  # 传递给 kafka的producer或者 consumer,本例是 producer
15    sasl_plain_username = 'username',
16    sasl_plain_password = '04UMwt9LUa9MJF6z'
17)