hxz
发布于 2023-10-13 / 139 阅读
0

【apiScanServer】基于Flask实现接口自动化检测服务

背景:

实际业务场景中,多数接口的实现可能都是用的同一套实现逻辑。比方说当前团队所有业务所有服务涉及到身份认证、签名校验服务的,对于开发人员而言,都是同一套实现逻辑。但是从测试人员的角度来说,接口校验中其实有很多都是重复的测试用例。

为减少测试人员的重复性工作,基于Python和Flask技术栈,我们可以实现接口自动化扫描服务,该服务介入到测试流程体系中,提升测试效率。

服务落地价值:

践行测试左移,要求开发必须通过该扫描工具后,才能发起提测申请。

服务实现设计:

本服务除了实现通用的headers校验,也增加了安全漏洞扫描(包括xss漏洞和sql注入)的服务实现,可以根据团队要求,自定义增加新的扫描策略,落地在扫描逻辑中。

  1. 对外协议(所有接口信息的传递方式)
    单接口扫描、
    单接口结果获取、
    批量接口扫描、
    批量接口结果获取

  2. 扫描策略:
    安全漏洞(owasptop10)、
    团队内通用的身份校验、
    协议规范、
    通用入参校验

  3. 表结构

扫描服务采用redis存储数据
redis表结构的设计:
key – scan_id:唯一标识
b’value’ – value描述一个字典结构,{“result”: True/False, “scan_strategy”: [“sqlInjection”, “xss”], “fail_data”: [{“attack”: “or 1=1”, “scan_key”: “kk”, “status_code”: 403, “res”: “text”}, {“attack”: “or 1=1”, “scan_key”: “kk”, “status_code”: 403, “res”: “text”}]}

服务实现项目细节

项目结构

----apiScanServer\
    |----app.py  #apiScan服务主体
    |----main.py    #启动扫描服务入口
    |----scan_business\
    |    |----headersCheck.py   #通用请求头校验逻辑
    |    |----requiredHeadersDb.yml     #接口请求数据,包括路由和headers
    |    |----sqlInjectDB.txt   #sql注入payload
    |    |----sqlInjection.py   #sql注入校验逻辑
    |    |----sqlInjection_checkDB.txt  #sql注入响应体校验
    |    |----xssInjection.py   #xss漏洞校验逻辑
    |    |----xssInjectionDB.txt    #xss校验payload
    |    |----__init__.py

判断逻辑(以sql注入为例:sqlInjection.py)

def sql_injection_scan(method, url, headers=None, data=None, data_type=None):
    if data is None:
        return []

    with open(file=DIR + '\\scan_business\\sqlInjectDB.txt', mode='r', encoding='utf-8') as f:
        attack_f = f.readlines()

    with open(file=DIR + '\\scan_business\\sqlInjection_checkDB.txt', mode='r', encoding='utf-8') as f2:
        check_list = f2.readlines()

    data = deepcopy(data)
    fail_data = []
    for k in data.keys():
        for attack in attack_f:
            data[k] = attack
            if data_type == 'params':
                res = requests.request(method=method, url=url, params=data, headers=headers)
            else:
                res = requests.request(method=method, url=url, json=data, headers=headers)
            # 接口结果分析
            for i in check_list:
                if i in res.text:
                    fail_data.append(
                        {"sqlInjection_attack": attack, "scan_key": k, "status_code": res.status_code, "res": res.text})
    return fail_data

框架调用主体(app.py)

单接口扫描

# 单接口扫描
@app.route('/scan', methods=['POST'])
def api_scan():
    # 入参协议校验
    try:
        validate(instance=request.json, schema=api_scan_schema)
    except jsonschema.exceptions.ValidationError as err:
        print(f'{request.remote_addr} request json error, err msg: {err}!')
        # 没有传url和method的时候返回这个
        return jsonify({"scan_id": None, "scan_status": False}), 400

    # 生成scan_id
    create_scan_id = str(int(time.time() * 1000)) + '_id'

    # 基准测试 校验是否符合扫描规则
    if 'params' in request.json.keys():
        data_type = 'params'
        re_data = request.json['params']

        res = requests.request(method=request.json['method'], url=request.json['url'], headers=request.json['headers'],
                               params=request.json['params'])
    elif 'json' in request.json.keys():
        data_type = 'json'
        re_data = request.json['json']
        res = requests.request(method=request.json['method'], url=request.json['url'], headers=request.json['headers'],
                               json=request.json['json'])
    else:
        data_type = None
        re_data = None
        res = requests.request(method=request.json['method'], url=request.json['url'], headers=request.json['headers'])
    if res.status_code != 200:
        # 被测服务状态码不是200的时候返回这个
        return jsonify({"scan_id": create_scan_id, "scan_status": False}), 403

    # 执行扫描策略
    def scan(scan_id, method, url, headers, data, data_type):
        all_scan_res = []
        # sql注入扫描
        all_scan_res.append(sql_injection_scan(method=method, url=url,
                                               headers=headers,
                                               data=data, data_type=data_type))
        # xss扫描
        all_scan_res.append(xss_injection_scan(method=method, url=url,
                                               headers=headers,
                                               data=data, data_type=data_type))
        # headers必填字段校验
        all_scan_res.append(headers_required_scan(method=method, url=url,
                                                  headers=headers,
                                                  data=data, data_type=data_type))

        # 扫描完成后
        redis_conn.set(f'scan_id:{scan_id}', json.dumps(all_scan_res), ex=3600 * 24)

    t = Thread(target=scan, args=(
        create_scan_id, request.json['method'], request.json['url'], request.json['headers'], re_data, data_type))
    t.start()

    return jsonify({"scan_id": create_scan_id, "scan_status": True}), 200

单接口获取扫描结果

# 获取扫描结果
@app.route('/scan_result', methods=['GET'])
def scan_result():
    scan_id = request.args['scan_id']
    res = redis_conn.get(f'scan_id:{scan_id}')

    if res is not None:
        # 如果res是非空,说明开启扫描过,可以开始解码列表返回给res
        res = json.loads(res.decode('utf-8'))

    else:
        # 如果res是空,说明未开启扫描,直接返回no scanning
        return jsonify({"result": False, "status": "no scanning", "fail_data": res}), 200

    # 上面取得的res,如果是空列表,说明没有扫描出来失败的
    if res is []:
        result = True
    else:
        result = False
    # 如果有失败的,列表不为空,result为Flase
    return jsonify({"result": result, "status": "scan success", "fail_data": res}), 200

批量接口扫描

# 批量接口扫描
@app.route('/batch_scan', methods=['POST'])
def api_batch_scan():
    scan_result_ids = []
    print(scan_result_ids)
    if isinstance(request.json, list):
        threads = []
        for api_info in request.json:
            # 入参协议校验
            try:
                validate(instance=api_info, schema=api_scan_schema)
            except jsonschema.exceptions.ValidationError as err:
                print(f'{request.remote_addr} request json error, err msg: {err}!')
                scan_result_ids.append(f"[url:{api_info['url']},headers:{api_info['headers']}]:scan_status is False")
                continue
                # return jsonify({"scan_id": None, "scan_status": False}), 400

            # 生成scan_id
            create_scan_id = str(int(time.time() * 1000)) + '_id'

            # 基准测试 校验是否符合扫描规则
            if 'params' in api_info.keys():
                data_type = 'params'
                re_data = api_info['params']

                res = requests.request(method=api_info['method'], url=api_info['url'], headers=api_info['headers'],
                                       params=api_info['params'])
            elif 'json' in api_info.keys():
                data_type = 'json'
                re_data = api_info['json']
                res = requests.request(method=api_info['method'], url=api_info['url'], headers=api_info['headers'],
                                       json=api_info['json'])
            else:
                data_type = None
                re_data = None
                res = requests.request(method=api_info['method'], url=api_info['url'], headers=api_info['headers'])
            if res.status_code != 200:
                # 被测服务状态码不是200的时候返回这个
                scan_result_ids.append(f"[url:{api_info['url']},headers:{api_info['headers']}]:status_code is 403")
                continue
                # return jsonify({"scan_id": create_scan_id, "scan_status": False}), 403

            # 执行扫描策略
            def scan(scan_id, method, url, headers, data, data_type):
                all_scan_res = []
                # sql注入扫描
                all_scan_res.append(sql_injection_scan(method=method, url=url,
                                                       headers=headers,
                                                       data=data, data_type=data_type))
                # xss扫描
                all_scan_res.append(xss_injection_scan(method=method, url=url,
                                                       headers=headers,
                                                       data=data, data_type=data_type))
                # headers必填字段校验
                all_scan_res.append(headers_required_scan(method=method, url=url,
                                                          headers=headers,
                                                          data=data, data_type=data_type))

                # 扫描完成后
                scan_result_ids.append(scan_id)
                # print(f"扫描完成后:{scan_result_ids}")
                redis_conn.set(f'scan_id:{scan_id}', json.dumps(all_scan_res), ex=3600 * 24)

            t = Thread(target=scan, args=(
                create_scan_id, api_info['method'], api_info['url'], api_info['headers'], re_data, data_type))
            t.start()
            # print(f"线程启动:{scan_result_ids}")
            threads.append(t)

        for t in threads:
            t.join()
        # print("返回批量信息")
        return jsonify({"batch_scan_list": scan_result_ids}), 200
    return jsonify({"msg": "request data should be type of list"}), 400


批量结果获取

# 批量获取扫描结果
@app.route('/batch_scan_result', methods=['POST'])
def batch_scan_result():
    scan_ids = request.json
    if isinstance(scan_ids, list):  # 对请求体进行入参校验
        results = []
        for scan_id in scan_ids:
            fail_data = redis_conn.get(f'scan_id:{scan_id}')
            # fail_data = dict.get(scan_id, None)
            if fail_data is not None:
                fail_data = json.loads(fail_data.decode('utf-8'))
                result = True
                for lst in fail_data:
                    if lst:
                        result = False
                        break
                    else:
                        result = True

                results.append({"scan_id": scan_id, "result": result, "fail_data": fail_data})
            else:

                results.append(
                    {"scan_id": scan_id, "result": False, "status": "without scanning record", "fail_data": fail_data})
        return jsonify({"results": results}), 200
    return jsonify({"msg": "request data should be type of list"}), 400

验证可用性

批量扫描接口,返回扫描结果id

通过结果id列表,批量查找id的实际结果