#!/usr/bin/env python3 # -*- coding: utf-8 -*- # File : admin.py # Author: DaShenHan&道长-----先苦后甜,任凭晚风拂柳颜------ # Date : 2022/9/6 import os import ujson from flask import Blueprint, request, render_template, render_template_string, jsonify, make_response, redirect from controllers.service import storage_service, rules_service, parse_service from base.R import R from base.database import db from utils.log import logger import shutil from utils.update import getLocalVer, getOnlineVer, download_new_version, download_lives, copy_to_update from utils import parser from utils.env import get_env, update_env from utils.web import getParmas, verfy_token from js.rules import getRules, getCacheCount from utils.parser import runJScode from werkzeug.utils import secure_filename from utils.web import md5 from utils.common_api import js_render from utils.files import get_jar_list, get_jsd_list, get_drop_js from quickjs import Function, Context admin = Blueprint("admin", __name__) # @admin.route("/",methods=['get']) # def index(): # return R.ok(msg='欢迎进入首页',data=None) # @admin.route("/info",methods=['get']) # def info_all(): # data = storage_service.query_all() # return R.ok(data=data) @admin.route('/') def admin_index(): # 管理员界面 if not verfy_token(): return render_template('login.html') lsg = storage_service() live_url = lsg.getItem('LIVE_URL') use_py = lsg.getItem('USE_PY') js0_password = lsg.getItem('JS0_PASSWORD') # print(f'live_url:', live_url) rules = getRules('js') # print(rules) cache_count = getCacheCount() # print(cache_count) return render_template('admin.html', js0_password=js0_password, pystate=use_py, rules=rules, cache_count=cache_count, ver=getLocalVer(), live_url=live_url) @admin.route('/settings') def admin_settings(): # 管理员界面 if not verfy_token(): return render_template('login.html') lsg = storage_service() # conf_list = 'LIVE_URL|USE_PY|PLAY_URL|PLAY_DISABLE|LAZYPARSE_MODE|WALL_PAPER_ENABLE|WALL_PAPER|UNAME|PWD|LIVE_MODE|LIVE_URL|CATE_EXCLUDE|TAB_EXCLUDE'.split('|') conf_lists = lsg.getStoreConf() # print(conf_lists) jar_lists = get_jar_list() SPIDER_JAR = lsg.getItem('SPIDER_JAR', 'custom_spider.jar') return render_template('settings.html', conf_lists=conf_lists, jar_lists=jar_lists, jar_now=SPIDER_JAR, ver=getLocalVer()) @admin.route('/save_conf', methods=['POST']) def admin_save_conf(): # 管理员界面 if not verfy_token(): # return render_template('login.html') return R.error('请登录后再试') key = getParmas('key') value = getParmas('value') print(f'key:{key},value:{value}') lsg = storage_service() res_id = lsg.setItem(key, value) return R.success(f'修改成功,记录ID为:{res_id}') @admin.route('/update_env', methods=['POST']) def admin_update_env(): # 更新环境变量中的某个值 if not verfy_token(): # return render_template('login.html') return R.error('请登录后再试') key = getParmas('key') value = getParmas('value') print(f'key:{key},value:{value}') ENV = update_env(key, value) return R.success(f'修改成功,最新的完整ENV见data', data=ENV) @admin.route("/edit/", methods=['GET']) def admin_edit_rule(name): # print(name) if not verfy_token(): return render_template('login.html') return render_template('edit_rule.html', name=name) @admin.route("/edit2/", methods=['GET']) def admin_edit2_rule(name): # print(name) if not verfy_token(): return render_template('login.html') return render_template('edit_rule_mobile.html', name=name) @admin.route("/save_edit/", methods=['POST']) def admin_save_edit_rule(name): # print(name) if not verfy_token(): return R.error('请登录后再试') code = getParmas('code') file_path = os.path.abspath(f'js/{name}') if 'var rule' not in code and name != '模板.js': return R.error(f'文件{name}保存失败,未检测到关键词:var rule') if not os.path.exists(file_path): return R.error('服务端没有此文件!' + file_path) logger.info(f'待保存文件路径:{file_path}') with open(file_path, mode='w+', encoding='utf-8') as f: f.write(code) return R.success(f'保存成功') @admin.route("/view/", methods=['GET']) def admin_view_rule(name): return js_render(name) # if not name or not name.split('.')[-1] in ['js','txt','py','json']: # return R.error(f'非法猥亵,未指定文件名。必须包含js|txt|json|py') # try: # env = get_env() # # print(env) # if env.get('js_proxy'): # js_proxy = env['js_proxy'] # burl = request.base_url # if '=>' in js_proxy: # oldsrc = js_proxy.split('=>')[0] # if oldsrc in burl: # newsrc = js_proxy.split('=>')[1] # # print(f'js1源代理已启用,全局替换{oldsrc}为{newsrc}') # rurl = burl.replace(oldsrc, newsrc) # if burl != rurl: # jscode = parser.getJs(name, 'js') # # rjscode = render_template_string(jscode, env=env) # rjscode = jscode # for k in env: # # print(f'${k}', f'{env[k]}') # if f'${k}' in rjscode: # rjscode = rjscode.replace(f'${k}', f'{env[k]}') # # rjscode = render_template_string(jscode, **env) # if rjscode.strip() == jscode.strip(): # 无需渲染才代理 # return redirect(rurl) # else: # logger.info(f'{name}由于存在环境变量无法被依赖代理') # # return parser.toJs(name,'js',env) # except Exception as e: # return R.error(f'非法猥亵\n{e}') @admin.route('/clear/') def admin_clear_rule(name): if not name or not name.split('.')[-1] in ['js', 'txt', 'py', 'json']: return R.error(f'非法猥亵,未指定文件名。必须包含js|txt|json|py') if not verfy_token(): return render_template('login.html') file_path = os.path.abspath(f'js/{name}') print(file_path) if not os.path.exists(file_path): return R.error('服务端没有此文件!' + file_path) os.remove(file_path) return R.ok('成功删除文件:' + file_path) @admin.route('/get_ver') def admin_get_ver(): if not verfy_token(): # return render_template('login.html') return R.error('请登录后再试') lsg = storage_service() update_proxy = lsg.getItem('UPDATE_PROXY') online_ver, msg = getOnlineVer(update_proxy) return jsonify({'local_ver': getLocalVer(), 'online_ver': online_ver, 'msg': msg}) @admin.route('/update_db') def admin_update_db(): if not verfy_token(): # return render_template('login.html') return R.error('请登录后再试') old_dbfile = 'migrations' if os.path.exists(old_dbfile): logger.info(f'开始删除历史数据库迁移文件:{old_dbfile}') shutil.rmtree(old_dbfile) db.session.execute('drop table if exists alembic_version') cmd = 'flask db migrate && flask db upgrade' if not os.path.exists('migrations'): cmd = 'flask db init && ' + cmd logger.info(f'开始执行cmd:{cmd}') result = os.system(cmd) logger.info(f'cmd执行结果:{result}') return R.success('数据库升级完毕') @admin.route('/update_ver') def admin_update_ver(): if not verfy_token(): return R.failed('请登录后再试') lsg = storage_service() update_proxy = lsg.getItem('UPDATE_PROXY') msg = download_new_version(update_proxy) return R.success(msg) @admin.route('/rule_state/', methods=['POST']) def admin_rule_state(state=0): # 管理员修改规则状态 if not verfy_token(): return R.error('请登录后再试') names = getParmas('names') if not names: return R.success(f'修改失败,没有传递names参数') rule_list = names.split(',') rules = rules_service() # print(rules.query_all()) # print(rules.getState(rule_list[0])) # print(rule_list) success_list = [] for rule in rule_list: try: res_id = rules.setState(rule, state) success_list.append(f'{rule}:{res_id}') except: success_list.append(rule) return R.success(f'修改成功,服务器反馈信息为:{success_list}') @admin.route('/rule_order/', methods=['POST']) def admin_rule_order(order=0): # 管理员修改规则顺序 if not verfy_token(): return R.error('请登录后再试') names = getParmas('names') if not names: return R.success(f'修改失败,没有传递names参数') rule_list = names.split(',') rules = rules_service() # print(rules.query_all()) # print(rules.getState(rule_list[0])) # print(rule_list) success_list = [] rule_list.reverse() # 倒序解决时间多重排序问题 for rule in rule_list: try: res_id = rules.setOrder(rule, order) success_list.append(f'{rule}:{res_id}') except: success_list.append(rule) return R.success(f'修改成功,服务器反馈信息为:{success_list}') @admin.route('/parse/save_data', methods=['POST']) def admin_parse_save_data(): # 管理员保存拖拽排序后的解析数据 if not verfy_token(): return R.error('请登录后再试') data = getParmas('data') if not data: return R.success(f'修改失败,没有传递data参数') parse = parse_service() success_list = [] data = ujson.loads(data) new_list = [] new_data = [] for nd in data: if not nd.get('url') and nd.get('name') != '🌐Ⓤ': continue if nd['url'] not in new_list: new_data.append(nd) new_list.append(nd['url']) print(f'去重前:{len(data)},去重后:{len(new_data)}') for i in range(len(new_data)): d = new_data[i] # if not d.get('url') and d.get('name') != '🌐Ⓤ': # continue obj = { 'name': d.get('name', ''), 'url': d.get('url', ''), 'state': d.get('state', 1), 'type': d.get('state', 0), 'order': i + 1, 'ext': d.get('ext', ''), 'header': d.get('header', ''), } # print(obj) try: parse.saveData(obj) success_list.append(f'parse:{d["url"]}') # print(obj) # print(200,obj) except Exception as e: success_list.append(d["url"]) print(f'{d["url"]}失败:{e}') # print(len(success_list)) return R.success(f'修改成功,服务器反馈信息为:{success_list}') @admin.route('/force_update') def admin_force_update(): if not verfy_token(): return R.failed('请登录后再试') ret = copy_to_update() if ret: msg = '升级成功' return R.success(msg) else: msg = '升级失败。具体原因只能去看实时日志(通过9001端口)' return R.failed(msg) @admin.route('/update_lives') def admin_update_lives(): url = getParmas('url') if not url: return R.failed('未提供被同步的直播源远程地址!') if not verfy_token(): return R.failed('请登录后再试') live_url = url success = download_lives(live_url) if success: return R.success(f'直播源{live_url}同步成功') else: return R.failed(f'直播源{live_url}同步失败') @admin.route('/write_live_url') def admin_write_live_url(): url = getParmas('url') if not url: return R.failed('未提供修改后的直播源地址!') if not verfy_token(): return R.failed('请登录后再试') lsg = storage_service() id = lsg.setItem('LIVE_URL', url) msg = f'已修改的配置记录id为:{id}' return R.success(msg) @admin.route('/change_use_py') def admin_change_use_py(): if not verfy_token(): return R.failed('请登录后再试') lsg = storage_service() use_py = lsg.getItem('USE_PY') new_use_py = '' if use_py else '1' state = '开启' if new_use_py else '关闭' id = lsg.setItem('USE_PY', new_use_py) msg = f'已修改的配置记录id为:{id},结果为{state}' return R.success(msg) @admin.route('/clear_drop') def admin_clear_drop(): if not verfy_token(): return R.failed('请登录后再试') jsd_list = get_jsd_list() logger.info(f'jsd文件列表:{jsd_list}') js_list = get_drop_js(jsd_list) rm_list = [] for i in range(len(js_list)): js_file = js_list[i] # shutil.rmtree(js_file, ignore_errors=False, onerror=None) if os.path.exists(js_file): os.remove(js_file) rm_list.append(jsd_list[i][:-1]) logger.info(f'待删除js文件列表:{rm_list}') rm_str = ','.join(rm_list) msg = f'清理完毕,本次共计清理{len(rm_list)}个\n {rm_str}' return R.success(msg) # @admin.route('/get_use_py') # def admin_get_use_py(): # if not verfy_token(): # return R.failed('请登录后再试') # lsg = storage_service() # use_py = lsg.getItem('USE_PY') # state = 1 if use_py else 0 # return R.success(state) @admin.route('/upload', methods=['POST']) def upload_file(): args = request.args force = args.get('force') if not verfy_token(): return render_template('login.html') if request.method == 'POST': try: file = request.files['file'] filename = secure_filename(file.filename) logger.info(f'推荐安全文件命名:{filename}') savePath = f'js/{file.filename}' # print(savePath) if os.path.exists(savePath) and not force: return R.failed(f'上传失败,文件已存在,请先查看删除再试') with open('js/模板.js', encoding='utf-8') as f2: before = f2.read().split('export')[0] end_code = """\nif (rule.模板 && muban.hasOwnProperty(rule.模板)) {rule = Object.assign(muban[rule.模板], rule);}""" upcode = file.stream.read().decode('utf-8') check_to_run = before + upcode + end_code # print(check_to_run) # try: # loader, _ = runJScode(check_to_run) # rule = loader.eval('rule') # if not rule: # return R.failed('文件上传失败,检测到上传的文件不是drpy框架支持的源代码') # except Exception as e: # logger.info(f'上传文件发生了错误:{e}') # return R.failed('文件上传失败,检测到上传的文件不是drpy框架支持的源代码') try: ctx = Context() ctx.eval(check_to_run) js_ret = ctx.get('rule') rule_json = js_ret.json() # 规则的json字符串 ruleDict = ujson.loads(rule_json) if not ruleDict: return R.failed('文件上传失败,检测到上传的文件不是drpy框架支持的源代码') except Exception as e: logger.info(f'上传文件发生了错误:{e}') return R.failed('文件上传失败,检测到上传的文件不是drpy框架支持的源代码') # print(savePath) file.seek(0) # 读取后变成空文件,重新赋能 file.save(savePath) return R.success('文件上传成功') except Exception as e: return R.failed(f'文件上传失败!{e}') else: # return render_template('upload.html') return R.failed('文件上传失败') @admin.route('/login', methods=['GET', 'POST']) def login_api(): username = getParmas('username') password = getParmas('password') autologin = getParmas('autologin') if not all([username, password]): return R.failed('账号密码字段必填') token = md5(f'{username};{password}') check = verfy_token(token=token) if check: # response = make_response(redirect('/admin')) response = make_response(R.success('登录成功')) response.set_cookie('token', token) return response else: return R.failed('登录失败,用户名或密码错误') @admin.route('/logtail') def admin_logtail(): if not verfy_token(): return R.failed('请登录后再试') return render_template('logtail.html') @admin.route('/lives') def admin_lives(): if not verfy_token(): return R.failed('请登录后再试') # print(dir(request)) # 完整地址: request.base_url url # 带http的前缀 host_url root_url # 不带http的前缀 host # 当前路径 path host_url = request.host_url def get_lives(): base_path = os.path.dirname(os.path.abspath(__file__)) # 当前文件所在目录 # print(base_path) live_path = os.path.join(base_path, '../txt/lives') # print(live_path) files = os.listdir(live_path) # print(files) # files = list(filter(lambda x: str(x).endswith('.txt') and str(x).find('模板') < 0, files)) files = list( filter(lambda x: str(x).split('.')[-1] in ['txt', 'json', 'm3u'] and str(x).find('模板') < 0, files)) files = [f'{host_url}lives?path=txt/lives/{file}' for file in files] return files files = '\n'.join(get_lives()) response = make_response(files) response.headers['Content-Type'] = 'text/plain; charset=utf-8' return response