pycurl 实现web服务质量探测
这是一个小工具用,于测试接口响应时间,包括 DNS 解析时间, TCP建立连接时间, 传输数据时间等等。
程序使用Python3 + pycurl库 curl 本质还是调用 libcurl库,在linux运行没问题,windows7及以前,由于本身并不包含libcurl,可能会出现不可预知问题,不确定一定能使用。
windows7使用,需要安装 vc++ 2015
使用说明
使用标准: 使用时,url部分,请携带协议,即 http://或https://
下载附件,解压,放到一个目录下,如: F:\chkurl\
打开 cmd
使用 F: 切换到 F盘
使用 cd chkurl 切换目录
使用 chkurl.exe -h 查看帮助,命令参数介绍如下:
1Usage: ckurl.py - [OPTIONS] -u [url]
2
3Options:
4 -h, --help 显示帮助
5 -e, --enable_reuse 启用TCP复用连接,默认不启用
6 -k, --insecure 忽略https证书检查(如过期、不受信任域名不匹配)
7 -L, --location 跟随重定向,默认,不跟随
8 -r 10, --redirs_num 10 指定最大重定向次数,默认 10
9 -D 0, --dns_cache 0 指定DNS缓存时间,0为不缓存,默认 0
10 --timeout=TIMEOUT 指定请求超时时间
11 --connect_time=CONNECT_TIME 连接超时时间
12 -s, --size 显示下载、上传、下载速度与上传速度
13 -u https://lenovo.com, --url 指定测试的URL
14 -c 3, --count=3 指定测试的次数
15 -H HEADER, --header=HEADER 添加 http 头部信息
16 -i IP, --ip_address=IP 指定IP,绕过DNS解析
17 -p, --post 使用post提交
18 -d POST_DATA, --data=POST_DATA post提交的数据
19 -F s.json, --data_file=s.json 从文件读取内容关提交post的数据
20 -l time.log, --logfile=time.log 指定测试速度日志文件指定的内容
21 -b, --body 显示 html body 主体内容,只显示第一次的
对于命令行参数, -H 添加 http头,若要添加多个,请多次使用 -H, 如:
1chkurl -H "Content-Type:application/json" -H "Connection: keep-alive" -H "xxxxxx"
对于 -i 选项,是指定ip地址,直接使用指定的地址,而不进行DNS解析,可以指定多个,与 -H 选项同样,多个次,请多次使用 -i 选项
指定多个IP的时候,会循环测试,即,先完成第一个IP的 -c 次数后,进行第二个IP,之后进行第三个、第四个、第五个一直到结束
指定IP的时候,请务必指定 http:// 或者 https:// 协议
命令执行完全后,会生成记录文件,如果没有使用 -l 指定位置,则位于当前目录下 time.log
使用配置文件
可以不指定任何参数,直接运行程序,参数在配置文件定义
PS: 在使用命令行参数的时候,只要没有指定 -u ,便会使用配置件,覆盖全部的参数
默认位于程序执行目录下,有一个 config.json 的文件,可在内部配置以上的参数信息
配置文件内容如下:
1{
2 "conf":{
3 "baseconfig":{
4 "tcp_reuse":false,
5 "follow_redirect":true,
6 "max_redirects":10,
7 "dns_cache":0,
8 "timeout":120,
9 "connect_time":120,
10 "cert_check":false
11 }
12 },
13 "display":{
14 "data_size":false,
15 "body":false
16 },
17 "request":{
18 "url":"https://www.lenovo.com.cn/my_backup",
19 "count":3,
20 "http_header": null,
21 "ip_address":null,
22 "post":true,
23 "post_data":null,
24 "post_data_file":null
25 },
26 "logfile":"time2.log"
27}
配置项说明:
1baseconfig:
2 tcp_reuse # TCP连接复用
3 follow_redirect: #跟随重定向
4 max_redirects: #最大重定向次数
5 dns_cache: # dns缓存,0为不缓存
6 timeout: #请求超时
7 connect_time #连接超时
8 cert_check: 忽略证书检查(https)
9
10display:
11 data_size: # 显示下载,上传的数据大小,以及上载和上传的速度
12 obdy: # 显示 html body
13request:
14 url: # 要测试的url
15 count: # 测试的次数
16 http_header: #添加http header, (必须是数组类型)
17 ip_address: 指定域名的IP,不通过DN解析 (必须是数组类型)
18 post: # 使用 post 请求
19 post_data: # post请求提交的数据,如果是json字符串,请转义,因为这个配置文件本身也是 json格式
20 post_data_file # 从一个文件读取内容,并提交
21 logfile: # 指定记录文件
request 配置中的 http_header , ip_address ,填写的时候,需要使用数组方式,如不需要,使用空值 null
1["192.168.121.171", "172.19.79.136", "17.93.2.88"]
2
3["Content-Type:application/json", "Connection: keep-alive"]
对于 POST 请求,如果需要提交数据的话,普通字符串,可以直接提交,部分特殊字符需要转义, 如单引号、双引号、百分号%(百分号仅限windows系统)。
例如使用windows系统的客户端测试,post 请求时提交json字符串的情况,需要转下:例如: 正常的 json 字符串:
1{"info":{"name":"zhangsan","age":18}}
转义后的字符串:
1{\"info\":{\"name\":\"zhangsan\",\"age\":18}}
转义后放可提交,例如:
1chkurl.exe -c 3 -u https://www.lenovo.com.cn -p -d '{\"info\":{\"name\":\"zhangsan\",\"age\":18}}'
附: 程序源码
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4import logging
5# import certifi # windows
6import time
7import pycurl
8from io import BytesIO
9from urllib.parse import urlparse
10from optparse import OptionParser
11import json
12
13
14class ex_curl(object):
15 def __init__(self, url, max_redirs=None, forbid_flush=None, dns_cache=None, timeout=None, connect_time=None, headers=None, redirect=None, ssl_check=None):
16 '''
17 self.c.setopt(pycurl.CAINFO, certifi.where()) # windows, 由于windows会报错找不到根证书,linux无需配置
18 self.c.setopt(pycurl.MAXREDIRS, 10) # 最大重定向次数
19 self.c.setopt(pycurl.FOLLOWLOCATION,1) # 跟踪重定向
20 self.c.setopt(pycurl.FORBID_REUSE, 1) # 交互完成后强制断开链接,不重用, 启用此项则每次都会重新建立TCP连接
21 self.c.setopt(pycurl.DNS_CACHE_TIMEOUT, 0) # DNS 缓存,单位 秒,0 不缓存,设置为0,则每次都会去尝试解析
22 self.c.setopt(pycurl.TIMEOUT,timeout) # 请求超时
23 self.c.setopt(pycurl.CONNECTTIMEOUT, connect_time) # 连接等街超时,0为不等待
24 self.c.setopt(pycurl.NOPROGRESS,1) # 屏蔽下载进度条
25 '''
26
27 self.c = pycurl.Curl()
28 self.buffer = BytesIO()
29
30 # self.c.setopt(pycurl.CAINFO, certifi.where())
31 self.c.setopt(pycurl.WRITEDATA, self.buffer)
32 self.c.setopt(pycurl.URL, url)
33 if ssl_check:
34 self.c.setopt(pycurl.SSL_VERIFYPEER, 0)
35 self.c.setopt(pycurl.SSL_VERIFYHOST, 0)
36
37 if max_redirs is None:
38 pass
39 else:
40 self.c.setopt(pycurl.MAXREDIRS, max_redirs)
41
42 if not forbid_flush:
43 self.c.setopt(pycurl.FORBID_REUSE, 1)
44 else:
45 self.c.setopt(pycurl.FORBID_REUSE, 0)
46
47 if dns_cache is None:
48 dns_cache = 0
49 self.c.setopt(pycurl.DNS_CACHE_TIMEOUT, dns_cache)
50
51 if not timeout is None:
52 self.c.setopt(pycurl.TIMEOUT, timeout)
53
54 if not connect_time is None:
55 self.c.setopt(pycurl.CONNECTTIMEOUT, connect_time)
56 if not headers is None:
57 self.c.setopt(pycurl.HTTPHEADER, headers)
58 if redirect:
59 self.c.setopt(pycurl.FOLLOWLOCATION, 1)
60
61 def GetRequest(self):
62 try:
63 self.c.perform()
64 except Exception as e:
65 print('connection error:' + str(e))
66 self.buffer.close()
67 self.c.close()
68 exit(2)
69 return self.c
70
71 def PostRequest(self, post_data=None):
72 if post_data is None:
73 post_data = ''
74 self.c.setopt(pycurl.POSTFIELDS, post_data)
75 try:
76 self.c.perform()
77 except Exception as e:
78 print('connection error:' + str(e))
79 self.buffer.close()
80 self.c.close()
81 exit(2)
82 return self.c
83
84 def getinfo(self):
85 h1 = self.c.getinfo(pycurl.HTTP_CODE) # 状态码
86 h2 = self.c.getinfo(pycurl.TOTAL_TIME) # 传输结束总消耗时间
87 h3 = self.c.getinfo(pycurl.NAMELOOKUP_TIME) # DNS解析时间
88 h4 = self.c.getinfo(pycurl.CONNECT_TIME) # 建立连接时间
89 h5 = self.c.getinfo(pycurl.PRETRANSFER_TIME) # 建立连接到准备传输消耗时间
90 h6 = self.c.getinfo(pycurl.STARTTRANSFER_TIME) # 从建立连接到传输开始消耗时间
91 h7 = self.c.getinfo(pycurl.REDIRECT_TIME) # 重定向消耗时间
92 h8 = self.c.getinfo(pycurl.SIZE_UPLOAD) # 上传数据包大小
93 h9 = self.c.getinfo(pycurl.SIZE_DOWNLOAD) # 下载数据包大小
94 h10 = self.c.getinfo(pycurl.SPEED_DOWNLOAD) # 平均下载速度
95 h11 = self.c.getinfo(pycurl.SPEED_UPLOAD) # 平均上传速度
96 h12 = self.c.getinfo(pycurl.HEADER_SIZE) # http头文件大小
97
98 info = {'body': {}, 'data': {}}
99 info['data']['http_code'] = h1
100 info['data']['total_time'] = h2 * 1000
101 info['data']['namelookup_time'] = h3 * 1000
102 info['data']['connect_time'] = h4 * 1000
103 info['data']['pretransfer_time'] = h5 * 1000
104 info['data']['starttransfer_time'] = h6 * 1000
105 info['data']['redirect_time'] = h7 * 1000
106 info['data']['upload_size'] = h8
107 info['data']['download_size'] = h9
108 info['data']['download_speed'] = h10
109 info['data']['upload_speed'] = h11
110 info['data']['http_header_size'] = h12
111
112 # self.buffer.close()
113 # self.c.close()
114
115 # body = self.buffer.getvalue()
116
117 info['body'] = self.buffer.getvalue()
118 self.buffer.truncate(0)
119 self.buffer.flush()
120 return info
121 # def GetHeader(self):
122 # return self.GetHeader(self.buffer)
123
124 def cls(self):
125 self.buffer.close()
126 self.c.close()
127 return None
128
129# def GetDomain(url):
130# domain = urlparse(url).netloc
131# if ":" in domain:
132# return domain.split(":")
133# return domain
134
135
136def Repurl(url, ip):
137 urlinfo = {}
138 domain = urlparse(url)
139 if ":" in domain.netloc:
140 hosts = domain.netloc.split(":")
141 url = domain.scheme + "://" + ip + ":"+hosts[1] + domain.path
142 if len(domain.query) > 0:
143 url += "?" + domain.query
144 if len(domain.fragment) > 0:
145 url += "#" + domain.fragment
146
147 urlinfo["url"] = url
148 urlinfo["domain"] = hosts[0]
149 return urlinfo
150
151 url = domain.scheme + "://" + ip + domain.path
152 if len(domain.query) > 0:
153 url += "?" + domain.query
154 if len(domain.fragment) > 0:
155 url += "#" + domain.fragment
156
157 urlinfo["url"] = url
158 urlinfo["domain"] = domain.netloc
159 return urlinfo
160
161
162if __name__ == '__main__':
163
164 text1 = "usage: %prog - [OPTIONS] -u [url]"
165
166 parser = OptionParser(usage=text1)
167
168 parser.add_option(
169 "-e", "--enable_reuse",
170 action="store_true",
171 dest="tcp_reuse",
172 help="Enable TCP connection reuse"
173 )
174 parser.add_option(
175 "-k", "--insecure",
176 action="store_true",
177 dest="cert_check",
178 help="Allow insecure server connections when using SSL"
179 )
180 parser.add_option(
181 "-L", "--location",
182 action="store_true",
183 dest="redirect",
184 help="Follow redirection, Default no,"
185 )
186 parser.add_option(
187 "-r", "--redirs_num",
188 type="int",
189 action="store",
190 dest="redirs",
191 help="Maximum number of redirects"
192 )
193 parser.add_option(
194 "-D", "--dns_cache",
195 type="int",
196 action="store",
197 dest="dns_cache",
198 help="DNS cache, 0 by default, no cache"
199 )
200 parser.add_option(
201 "--timeout",
202 type="int",
203 action="store",
204 dest="timeout",
205 help="Request timeout"
206 )
207 parser.add_option(
208 "--connect_time",
209 type="int",
210 action="store",
211 dest="connect_time",
212 help="Connect timeout"
213 )
214 '''end'''
215
216 parser.add_option(
217 "-s", "--size",
218 action="store_true",
219 dest="size",
220 help="Display the download and upload speed and data size"
221 )
222 parser.add_option(
223 "-u", "--url",
224 action="store",
225 dest="url",
226 help="Specify URL"
227 )
228 parser.add_option(
229 "-c", "--count",
230 type="int",
231 action="store",
232 dest="count",
233 help="Specify the number of tests"
234 )
235 parser.add_option(
236 "-H", "--header",
237 action="append",
238 dest="header",
239 help="Add http header, Multiple, please use comma to separate."
240 )
241 parser.add_option(
242 "-i", "--ip_address",
243 action="append",
244 dest="ipaddress",
245 help="Specify IP address, bypass DNS resolution"
246 )
247 parser.add_option(
248 "-p", "--post",
249 action="store_true",
250 dest="post_commit",
251 help="Post request, Data must be submitted"
252 )
253 parser.add_option(
254 "-d", "--data",
255 action="store",
256 dest="post_data",
257 help="Define the data submitted by POST"
258 )
259 parser.add_option(
260 "-F", "--data_file",
261 action="store",
262 dest="post_data_file",
263 help="Submit POST data from file"
264 )
265
266 '''start, logging 处理'''
267 parser.add_option(
268 "-l", "--logfile",
269 action="store",
270 dest="logfile",
271 help="Define log file path"
272 )
273 '''end'''
274 # body
275 parser.add_option(
276 "-b", "--body",
277 action="store_true",
278 dest="body",
279 help="Displays the body of the page, Last One"
280 )
281 (options, argv) = parser.parse_args()
282
283 if options.url is None:
284 print("You did not specify a URL. Read the config.json configuration file")
285 try:
286 with open("config.json", "rb") as f:
287 glboal_conf = json.loads(f.read())
288 f.close()
289 except Exception as e:
290 print(str(e))
291 exit(2)
292 options.forbid_reuse = glboal_conf['conf']['baseconfig']['tcp_reuse']
293 options.redirect = glboal_conf['conf']['baseconfig']['follow_redirect']
294 options.redirs = glboal_conf['conf']['baseconfig']['max_redirects']
295 options.dns_cache = glboal_conf['conf']['baseconfig']['dns_cache']
296 options.timeout = glboal_conf['conf']['baseconfig']['timeout']
297 options.connect_time = glboal_conf['conf']['baseconfig']['connect_time']
298 options.cert_check = glboal_conf['conf']['baseconfig']['cert_check']
299 options.size = glboal_conf['display']['data_size']
300 options.body = glboal_conf['display']['body']
301 options.url = glboal_conf['request']['url']
302 options.count = glboal_conf['request']['count']
303 options.header = glboal_conf['request']['http_header']
304 options.ipaddress = glboal_conf['request']['ip_address']
305 options.post_commit = glboal_conf['request']['post']
306 options.post_data = glboal_conf['request']['post_data']
307 options.post_data_file = glboal_conf['request']['post_data_file']
308 options.logfile = glboal_conf['logfile']
309
310 # if options.count is None or not options.count.isdigit():
311 # print("Error! You must specify a number of tests")
312 # print("Use -h to view help.")
313 # exit(2)
314
315 exec_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
316 # logging.info(exec_time)
317 if options.logfile is None:
318 log_file = "time.log"
319 else:
320 log_file = options.logfile
321
322 logger = logging.getLogger()
323 logger.setLevel(logging.DEBUG)
324 formatter = logging.Formatter("%(message)s")
325
326 fh = logging.FileHandler(log_file, mode='a')
327 fh.setLevel(logging.DEBUG)
328 fh.setFormatter(formatter)
329 logger.addHandler(fh)
330
331 ch = logging.StreamHandler()
332 ch.setLevel(logging.DEBUG)
333 ch.setFormatter(formatter)
334 logger.addHandler(ch)
335 logger.info("%s Test Website: %s, \tNumber of tests: %s" %
336 (exec_time, options.url, options.count))
337
338 # 如果指定了IP地址的话,则将请求的URL中的域名部分,换成IP地址,
339 # 将URL拆分之后再重新拼接,目前使用的 urllib.parse urlparse (urlsplit)
340 # 考虑到 带端口的情况,所以如果携带了端口,还需要把端口单独分开
341 # 在指定了 IP 地址的时候,必须带上协议 即 http:// 或者 https://
342
343 if not options.ipaddress is None:
344 list1 = {}
345 for ips in options.ipaddress:
346 # list1 = {ips:{'data':[]}}
347 list1[ips] = {'data': []}
348 req_url = Repurl(options.url, ips)
349 header_list = []
350 header_list.append("Host: " + req_url["domain"])
351 if not options.header is None:
352 for x in options.header:
353 header_list.append(x)
354 t = ex_curl(req_url["url"], options.redirs, options.tcp_reuse,
355 options.dns_cache, options.timeout, options.connect_time, header_list, options.redirect, ssl_check=options.cert_check)
356 # print(header_list)
357 # 占位
358 # print(options.header)
359
360 post_req = False
361 if options.post_commit:
362 # if not options.post_data is None and not options.post_data_file is None:
363 # print("No submitt PostD ATA")
364 # print("Use Post request, Use with -d or -D")
365 # exit(2)
366 if not options.post_data is None:
367 post_req = True
368 post_req_data = options.post_data
369
370 if not options.post_data_file is None:
371 print(options.post_data_file)
372 try:
373 with open(options.post_data_file, "r") as f:
374 post_req_data = f.read()
375 f.close()
376 except Exception as e:
377 print(str(e))
378 exit(2)
379 post_req = True
380 # print(post_req_data)
381 post_req = True
382 post_req_data = None
383 # print(header_list)
384
385 for i in range(0, options.count):
386 if post_req:
387 t.PostRequest(post_req_data)
388 else:
389 t.GetRequest()
390 s1 = t.getinfo()
391
392 list1[ips]['data'].append(s1['data'])
393
394 if options.body:
395 print(list1['body'])
396 t.cls()
397 print(list1)
398 else:
399 list1 = []
400 list2 = []
401 t = ex_curl(options.url, options.redirs, options.tcp_reuse,
402 options.dns_cache, options.timeout, options.connect_time, options.header, options.redirect, options.cert_check)
403 post_req = False
404 if options.post_commit:
405 # if not options.post_data is None and not options.post_data_file is None:
406 # print("No submitt PostD ATA")
407 # print("Use Post request, Use with -d or -D")
408 # exit(2)
409 if not options.post_data is None:
410 post_req = True
411 post_req_data = options.post_data
412
413 if not options.post_data_file is None:
414 try:
415 with open(options.post_data_file, "r") as f:
416 post_req_data = f.read()
417 f.close()
418 except Exception as s:
419 print(str(s))
420 exit(2)
421 post_req = True
422 # print(post_req_data)
423 post_req_data = options.post_data
424 post_req = True
425 # print(header_list)
426
427 for i in range(0, options.count):
428 if post_req:
429 t.PostRequest(post_req_data)
430 else:
431 t.GetRequest()
432 s1 = t.getinfo()
433 list1.append(s1['data'])
434 list2.append(s1["body"])
435 if options.body:
436 print(list2['body'][0].decode('utf-8'))
437 t.cls()
438
439 if options.size:
440 d = {
441 'HTTP Code:\t': 'http_code',
442 'Total Time:\t': 'total_time',
443 'DNS Lookup:\t': 'namelookup_time',
444 'TCP Connect:\t': 'connect_time',
445 'Pretransfer:\t': 'pretransfer_time',
446 'StartTransfer:\t': 'starttransfer_time',
447 'Redirect:\t': 'redirect_time',
448 'Download Size:\t': 'download_size',
449 'Upload Size:\t': 'upload_size',
450 'Download Speed:\t': 'download_speed',
451 'Upload Speed:\t': 'upload_speed',
452 'Header Size:\t': 'http_header_size'
453 }
454 else:
455 d = {
456 'HTTP Code:\t': 'http_code',
457 'Total Time:\t': 'total_time',
458 'DNS Lookup:\t': 'namelookup_time',
459 'TCP Connect:\t': 'connect_time',
460 'Pretransfer:\t': 'pretransfer_time',
461 'StartTransfer:\t': 'starttransfer_time',
462 'Redirect:\t': 'redirect_time',
463 'Header Size:\t': 'http_header_size'
464 }
465
466 # print(list1)
467
468 if options.ipaddress:
469 for ips2 in options.ipaddress:
470 logging.info(ips2)
471 for i, n in (d.items()):
472 # print(i, end='')
473 text2 = i
474 temp_list = []
475 for b in range(0, len(list1[ips2]['data'])):
476 temp_list.append(list1[ips2]['data'][b][n])
477 if n == "download_speed" or n == "upload_speed":
478 text2 += "No.%d: %.2f bytes/s, " % (
479 b + 1, list1[ips2]['data'][b][n])
480 elif n == "download_size" or n == "upload_size":
481 text2 += "No.%d: %.2f bytes, " % (
482 b + 1, list1[ips2]['data'][b][n])
483 elif (type(list1[ips2]['data'][b][n]) == float):
484 text2 += "No.%d: %.2f ms, " % (b + 1,
485 list1[ips2]['data'][b][n])
486 else:
487 text2 += "No.%d: %d, " % (b + 1,
488 list1[ips2]['data'][b][n])
489
490 if n == "http_code" or n == "http_header_size":
491 pass
492 else:
493 text2 += "\t[Min: %.2f, Max: %.2f, Avg: %.2f]" % (
494 min(temp_list), max(temp_list), sum(temp_list)/len(temp_list))
495 logging.info(text2)
496 logging.info("\n")
497 else:
498 for i, n in (d.items()):
499 # print(i, end='')
500 text2 = i
501 temp_list = []
502
503 for b in range(0, len(list1)):
504 temp_list.append(list1[b][n])
505 if n == "download_speed" or n == "upload_speed":
506 text2 += "No.%d: %.2f bytes/s, " % (b + 1, list1[b][n])
507 elif n == "download_size" or n == "upload_size":
508 text2 += "No.%d: %.2f bytes, " % (b + 1, list1[b][n])
509 elif (type(list1[b][n]) == float):
510 text2 += "No.%d: %.2f ms, " % (b + 1, list1[b][n])
511 else:
512 text2 += "No.%d: %d, " % (b + 1, list1[b][n])
513
514 if n == "http_code" or n == "http_header_size":
515 pass
516 else:
517 text2 += "\t[Min: %.2f, Max: %.2f, Avg: %.2f]" % (
518 min(temp_list), max(temp_list), sum(temp_list)/len(temp_list))
519 logging.info(text2)
520 logging.info("\n")