背景:
实际业务场景中,多数接口的实现可能都是用的同一套实现逻辑。比方说当前团队所有业务所有服务涉及到身份认证、签名校验服务的,对于开发人员而言,都是同一套实现逻辑。但是从测试人员的角度来说,接口校验中其实有很多都是重复的测试用例。
为减少测试人员的重复性工作,基于Python和Flask技术栈,我们可以实现接口自动化扫描服务,该服务介入到测试流程体系中,提升测试效率。
服务落地价值:
践行测试左移,要求开发必须通过该扫描工具后,才能发起提测申请。
服务实现设计:
本服务除了实现通用的headers校验,也增加了安全漏洞扫描(包括xss漏洞和sql注入)的服务实现,可以根据团队要求,自定义增加新的扫描策略,落地在扫描逻辑中。
对外协议(所有接口信息的传递方式)
单接口扫描、
单接口结果获取、
批量接口扫描、
批量接口结果获取扫描策略:
安全漏洞(owasptop10)、
团队内通用的身份校验、
协议规范、
通用入参校验表结构
扫描服务采用redis存储数据
redis表结构的设计:
key – scan_id:唯一标识
b’value’ – value描述一个字典结构,{“result”: True/False, “scan_strategy”: [“sqlInjection”, “xss”], “fail_data”: [{“attack”: “or 1=1”, “scan_key”: “kk”, “status_code”: 403, “res”: “text”}, {“attack”: “or 1=1”, “scan_key”: “kk”, “status_code”: 403, “res”: “text”}]}
服务实现项目细节
项目结构
----apiScanServer\
|----app.py #apiScan服务主体
|----main.py #启动扫描服务入口
|----scan_business\
| |----headersCheck.py #通用请求头校验逻辑
| |----requiredHeadersDb.yml #接口请求数据,包括路由和headers
| |----sqlInjectDB.txt #sql注入payload
| |----sqlInjection.py #sql注入校验逻辑
| |----sqlInjection_checkDB.txt #sql注入响应体校验
| |----xssInjection.py #xss漏洞校验逻辑
| |----xssInjectionDB.txt #xss校验payload
| |----__init__.py
判断逻辑(以sql注入为例:sqlInjection.py)
def sql_injection_scan(method, url, headers=None, data=None, data_type=None):
if data is None:
return []
with open(file=DIR + '\\scan_business\\sqlInjectDB.txt', mode='r', encoding='utf-8') as f:
attack_f = f.readlines()
with open(file=DIR + '\\scan_business\\sqlInjection_checkDB.txt', mode='r', encoding='utf-8') as f2:
check_list = f2.readlines()
data = deepcopy(data)
fail_data = []
for k in data.keys():
for attack in attack_f:
data[k] = attack
if data_type == 'params':
res = requests.request(method=method, url=url, params=data, headers=headers)
else:
res = requests.request(method=method, url=url, json=data, headers=headers)
# 接口结果分析
for i in check_list:
if i in res.text:
fail_data.append(
{"sqlInjection_attack": attack, "scan_key": k, "status_code": res.status_code, "res": res.text})
return fail_data
框架调用主体(app.py)
单接口扫描
# 单接口扫描
@app.route('/scan', methods=['POST'])
def api_scan():
# 入参协议校验
try:
validate(instance=request.json, schema=api_scan_schema)
except jsonschema.exceptions.ValidationError as err:
print(f'{request.remote_addr} request json error, err msg: {err}!')
# 没有传url和method的时候返回这个
return jsonify({"scan_id": None, "scan_status": False}), 400
# 生成scan_id
create_scan_id = str(int(time.time() * 1000)) + '_id'
# 基准测试 校验是否符合扫描规则
if 'params' in request.json.keys():
data_type = 'params'
re_data = request.json['params']
res = requests.request(method=request.json['method'], url=request.json['url'], headers=request.json['headers'],
params=request.json['params'])
elif 'json' in request.json.keys():
data_type = 'json'
re_data = request.json['json']
res = requests.request(method=request.json['method'], url=request.json['url'], headers=request.json['headers'],
json=request.json['json'])
else:
data_type = None
re_data = None
res = requests.request(method=request.json['method'], url=request.json['url'], headers=request.json['headers'])
if res.status_code != 200:
# 被测服务状态码不是200的时候返回这个
return jsonify({"scan_id": create_scan_id, "scan_status": False}), 403
# 执行扫描策略
def scan(scan_id, method, url, headers, data, data_type):
all_scan_res = []
# sql注入扫描
all_scan_res.append(sql_injection_scan(method=method, url=url,
headers=headers,
data=data, data_type=data_type))
# xss扫描
all_scan_res.append(xss_injection_scan(method=method, url=url,
headers=headers,
data=data, data_type=data_type))
# headers必填字段校验
all_scan_res.append(headers_required_scan(method=method, url=url,
headers=headers,
data=data, data_type=data_type))
# 扫描完成后
redis_conn.set(f'scan_id:{scan_id}', json.dumps(all_scan_res), ex=3600 * 24)
t = Thread(target=scan, args=(
create_scan_id, request.json['method'], request.json['url'], request.json['headers'], re_data, data_type))
t.start()
return jsonify({"scan_id": create_scan_id, "scan_status": True}), 200
单接口获取扫描结果
# 获取扫描结果
@app.route('/scan_result', methods=['GET'])
def scan_result():
scan_id = request.args['scan_id']
res = redis_conn.get(f'scan_id:{scan_id}')
if res is not None:
# 如果res是非空,说明开启扫描过,可以开始解码列表返回给res
res = json.loads(res.decode('utf-8'))
else:
# 如果res是空,说明未开启扫描,直接返回no scanning
return jsonify({"result": False, "status": "no scanning", "fail_data": res}), 200
# 上面取得的res,如果是空列表,说明没有扫描出来失败的
if res is []:
result = True
else:
result = False
# 如果有失败的,列表不为空,result为Flase
return jsonify({"result": result, "status": "scan success", "fail_data": res}), 200
批量接口扫描
# 批量接口扫描
@app.route('/batch_scan', methods=['POST'])
def api_batch_scan():
scan_result_ids = []
print(scan_result_ids)
if isinstance(request.json, list):
threads = []
for api_info in request.json:
# 入参协议校验
try:
validate(instance=api_info, schema=api_scan_schema)
except jsonschema.exceptions.ValidationError as err:
print(f'{request.remote_addr} request json error, err msg: {err}!')
scan_result_ids.append(f"[url:{api_info['url']},headers:{api_info['headers']}]:scan_status is False")
continue
# return jsonify({"scan_id": None, "scan_status": False}), 400
# 生成scan_id
create_scan_id = str(int(time.time() * 1000)) + '_id'
# 基准测试 校验是否符合扫描规则
if 'params' in api_info.keys():
data_type = 'params'
re_data = api_info['params']
res = requests.request(method=api_info['method'], url=api_info['url'], headers=api_info['headers'],
params=api_info['params'])
elif 'json' in api_info.keys():
data_type = 'json'
re_data = api_info['json']
res = requests.request(method=api_info['method'], url=api_info['url'], headers=api_info['headers'],
json=api_info['json'])
else:
data_type = None
re_data = None
res = requests.request(method=api_info['method'], url=api_info['url'], headers=api_info['headers'])
if res.status_code != 200:
# 被测服务状态码不是200的时候返回这个
scan_result_ids.append(f"[url:{api_info['url']},headers:{api_info['headers']}]:status_code is 403")
continue
# return jsonify({"scan_id": create_scan_id, "scan_status": False}), 403
# 执行扫描策略
def scan(scan_id, method, url, headers, data, data_type):
all_scan_res = []
# sql注入扫描
all_scan_res.append(sql_injection_scan(method=method, url=url,
headers=headers,
data=data, data_type=data_type))
# xss扫描
all_scan_res.append(xss_injection_scan(method=method, url=url,
headers=headers,
data=data, data_type=data_type))
# headers必填字段校验
all_scan_res.append(headers_required_scan(method=method, url=url,
headers=headers,
data=data, data_type=data_type))
# 扫描完成后
scan_result_ids.append(scan_id)
# print(f"扫描完成后:{scan_result_ids}")
redis_conn.set(f'scan_id:{scan_id}', json.dumps(all_scan_res), ex=3600 * 24)
t = Thread(target=scan, args=(
create_scan_id, api_info['method'], api_info['url'], api_info['headers'], re_data, data_type))
t.start()
# print(f"线程启动:{scan_result_ids}")
threads.append(t)
for t in threads:
t.join()
# print("返回批量信息")
return jsonify({"batch_scan_list": scan_result_ids}), 200
return jsonify({"msg": "request data should be type of list"}), 400
批量结果获取
# 批量获取扫描结果
@app.route('/batch_scan_result', methods=['POST'])
def batch_scan_result():
scan_ids = request.json
if isinstance(scan_ids, list): # 对请求体进行入参校验
results = []
for scan_id in scan_ids:
fail_data = redis_conn.get(f'scan_id:{scan_id}')
# fail_data = dict.get(scan_id, None)
if fail_data is not None:
fail_data = json.loads(fail_data.decode('utf-8'))
result = True
for lst in fail_data:
if lst:
result = False
break
else:
result = True
results.append({"scan_id": scan_id, "result": result, "fail_data": fail_data})
else:
results.append(
{"scan_id": scan_id, "result": False, "status": "without scanning record", "fail_data": fail_data})
return jsonify({"results": results}), 200
return jsonify({"msg": "request data should be type of list"}), 400
验证可用性
批量扫描接口,返回扫描结果id
通过结果id列表,批量查找id的实际结果