105 lines
3.6 KiB
Python
105 lines
3.6 KiB
Python
|
#!/usr/bin/env python3
|
||
|
# -*- coding: utf-8 -*-
|
||
|
# File : safePython.py
|
||
|
# Author: DaShenHan&道长-----先苦后甜,任凭晚风拂柳颜------
|
||
|
# Date : 2022/8/28
|
||
|
|
||
|
import io
|
||
|
import tokenize
|
||
|
|
||
|
from func_timeout import func_set_timeout
|
||
|
from func_timeout.exceptions import FunctionTimedOut
|
||
|
from urllib.parse import urljoin,quote,unquote
|
||
|
import requests
|
||
|
import time
|
||
|
import json
|
||
|
import re
|
||
|
from lxml import etree
|
||
|
import datetime
|
||
|
import base64
|
||
|
from utils.log import logger
|
||
|
|
||
|
time_out_sec = 8 # 安全执行python代码超时
|
||
|
class my_exception(Exception):
|
||
|
def __init__(self, message):
|
||
|
self.message = message
|
||
|
|
||
|
def __str__(self):
|
||
|
message = f'函数执行超时: "{self.message}"'
|
||
|
return message
|
||
|
|
||
|
@func_set_timeout(time_out_sec)
|
||
|
def excute(*args):
|
||
|
exec(*args)
|
||
|
|
||
|
def check_unsafe_attributes(string):
|
||
|
"""
|
||
|
安全检测需要exec执行的python代码
|
||
|
:param string:
|
||
|
:return:
|
||
|
"""
|
||
|
g = tokenize.tokenize(io.BytesIO(string.encode('utf-8')).readline)
|
||
|
pre_op = ''
|
||
|
for toktype, tokval, _, _, _ in g:
|
||
|
if toktype == tokenize.NAME and pre_op == '.' and tokval.startswith('_'):
|
||
|
attr = tokval
|
||
|
msg = "access to attribute '{0}' is unsafe.".format(attr)
|
||
|
raise AttributeError(msg)
|
||
|
elif toktype == tokenize.OP:
|
||
|
pre_op = tokval
|
||
|
|
||
|
DEFAULT_PYTHON_CODE = """# 可用内置环境变量:
|
||
|
# - log: log(message): 打印日志功能
|
||
|
# - error: 弹出用户错误的弹窗
|
||
|
# 返回变量值: result = {...}\n\n
|
||
|
zyw_lists = env['hikerule.zyw.list'].with_context(active_test=True).sudo().search(
|
||
|
[('option', '=', 'zy'), ('cate_id.name', '!=', '18+'),('cate_id.is_bad', '!=', True)])
|
||
|
result = env['hikerule.zyw.list2data.wizard'].sudo().get_publish_value(zyw_lists)
|
||
|
"""
|
||
|
|
||
|
class safePython:
|
||
|
def __init__(self,name, code):
|
||
|
self.name = name or '未定义'
|
||
|
self.code = code
|
||
|
|
||
|
def action_task_exec(self,call=None,params=None):
|
||
|
"""
|
||
|
接口调用执行函数
|
||
|
:return:
|
||
|
"""
|
||
|
if not params:
|
||
|
params = []
|
||
|
builtins = __builtins__
|
||
|
builtins = dict(builtins).copy()
|
||
|
for key in ['__import__','eval','exec','globals','dir','copyright','open','quit']:
|
||
|
del builtins[key] # 删除不安全的关键字
|
||
|
# print(builtins)
|
||
|
global_dict = {'__builtins__': builtins,
|
||
|
'requests': requests, 'urljoin':urljoin,'quote':quote,'unquote': unquote,
|
||
|
'log': logger.info, 'json': json,'print':print,
|
||
|
're':re,'etree':etree,'time':time,'datetime':datetime,'base64':base64
|
||
|
} # 禁用内置函数,不允许导入包
|
||
|
try:
|
||
|
check_unsafe_attributes(self.code)
|
||
|
localdict = {'result': None}
|
||
|
# 待解决windows下运行超时的问题
|
||
|
base_code = self.code.strip()
|
||
|
if call:
|
||
|
logger.info(f'开始执行:{call}')
|
||
|
try:
|
||
|
# excute(to_run_code, global_dict, localdict)
|
||
|
excute(base_code, global_dict, localdict)
|
||
|
run = localdict.get(call)
|
||
|
if run:
|
||
|
localdict['result'] = run(*params)
|
||
|
except FunctionTimedOut:
|
||
|
raise my_exception(f'函数[{self.name}]运行时间超过{time_out_sec}秒,疑似死循环,已被系统切断')
|
||
|
except Exception as e:
|
||
|
ret = f'执行报错:{e}'
|
||
|
logger.info(ret)
|
||
|
return ret
|
||
|
else:
|
||
|
# print(global_dict)
|
||
|
# print(localdict)
|
||
|
ret = localdict['result']
|
||
|
return ret
|