N3RDN/JN/dr_py/json/采集分类生成器.py
2024-07-06 19:57:54 +08:00

227 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File : 采集分类生成器.py
# Author: DaShenHan&道长-----先苦后甜,任凭晚风拂柳颜------
# Date : 2024/6/21
import os
import json
import gzip
import base64
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor
from pprint import pprint
import time
import requests
import warnings
# 关闭警告
warnings.filterwarnings("ignore")
requests.packages.urllib3.disable_warnings()
pool = ThreadPoolExecutor(max_workers=20) # 初始化线程池内线程数量为20
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1',
'Connection': 'close' # 设置为关闭长连接
}
timeout = 5 # 5秒
use_gzip = False
def compress_and_encode(data: str):
# 压缩数据
compressed_data = gzip.compress(data.encode('utf-8'))
# 对压缩数据进行Base64编码
encoded_data = base64.b64encode(compressed_data).decode('utf-8')
return encoded_data
def decode_and_decompress(encoded_data: str):
# 解码Base64数据
decoded_data = base64.b64decode(encoded_data.encode('utf-8'))
# 解压缩数据
decompressed_data = gzip.decompress(decoded_data).decode('utf-8')
return decompressed_data
def get_classes(rec):
classes = None
if rec.get('url') and str(rec['url']).startswith('http'):
_class_api = rec.get('api') or '/api.php/provide/vod/'
_api = urljoin(str(rec['url']).rstrip('/'), _class_api)
# _api = urljoin(rec['url'], '/api.php/provide/vod/at/json')
print(_api)
try:
r = requests.get(_api, headers=headers, timeout=timeout, verify=False)
ret = r.json()
if rec.get('name') == '乐视资源':
print('=======乐视=========')
print(ret)
# print(ret)
classes = ret.get('class')
except Exception as e:
print(f'获取资源【{rec["name"]}】({_api})分类发生错误:{e}')
return classes
def convert_class(classes, name=None):
"""
获取的分类转静态分类格式
@param classes:
@return:
"""
if name is None:
name = ''
if not classes:
return {
"name": "",
"class_name": "",
"class_url": "",
}
class_names = []
class_urls = []
for cls in classes:
if cls.get('type_name') and cls.get('type_id'):
class_urls.append(str(cls['type_id']))
class_names.append(str(cls['type_name']))
global use_gzip
return {
"name": name,
"class_name": compress_and_encode('&'.join(class_names)) if use_gzip else '&'.join(class_names),
"class_url": '&'.join(class_urls),
}
def get_convert_classes(rec):
classes = get_classes(rec)
classes = convert_class(classes, rec.get('name'))
return classes
def check_class(api, type_name, type_id, limit_count=6):
_url = f'{api}?ac=detail&pg=1&t={type_id}'
try:
r = requests.get(_url, headers=headers, timeout=timeout, verify=False)
ret = r.json()
if not ret.get("list") or len(ret["list"]) < limit_count:
print(f'获取资源 {api} 分类【{type_name}】数量为:{len(ret["list"])} 小于{limit_count}视为排除')
return False
except Exception as e:
print(f'获取资源 {_url} 分类【{type_name}】发生错误:{e}')
return True
def check_active(api):
try:
r = requests.get(api, headers=headers, timeout=timeout, verify=False)
ret = r.json()
if not ret.get("class"):
return False
except Exception as e:
print(f'检查api: {api} 存活发生错误:{e}')
return False
return True
def main(fname='采集'):
file_path = f'./{fname}.json'
out_file_path = file_path.replace('.json', '静态.json')
if not os.path.exists(file_path):
exit(f'不存在采集文件路径:{file_path}')
with open(file_path, encoding='utf-8') as f:
data = f.read()
records = json.loads(data)
print(records)
# for rec in records:
# ret = get_convert_classes(rec)
# pprint(ret)
tasks = [pool.submit(get_convert_classes, rec) for rec in records] # 构造一个列表循环向线程池内submit提交执行的方法
pool.shutdown(wait=True) # 线程数等待所有线程结束,这里 卡住主线程
results = [task.result() for task in tasks]
print(results)
new_records = []
for record in records:
rec_name = record["name"]
if rec_name:
has_name = [ret for ret in results if ret.get("name") == rec_name]
if has_name:
record.update(has_name[-1])
new_records.append(record)
pprint(new_records)
print(f'转换静态数据成功记录数:{len(new_records)}')
with open(out_file_path, mode='w+', encoding='utf-8') as f:
f.write(json.dumps(new_records, ensure_ascii=False, indent=2))
def main_exclude(fname='采集静态', max_workers=0):
file_path = f'./{fname}.json'
if not os.path.exists(file_path):
exit(f'不存在采集文件路径:{file_path}')
with open(file_path, encoding='utf-8') as f:
data = f.read()
records = json.loads(data)
if len(records) < 1 or not records[0].get('class_name'):
exit('输入数据有误,疑似不是静态数据')
print(records)
new_records = []
for rec in records:
new_rec = rec.copy()
if rec.get('api'):
api_url = urljoin(rec['url'], rec['api'])
else:
api_url = urljoin(rec['url'], '/api.php/provide/vod/')
print(api_url)
cate_excludes = []
if not check_active(api_url):
print(f'{rec["name"]} ({rec["url"]})视为不存活,跳过分类检测')
else:
class_names = decode_and_decompress(rec['class_name']).split('&')
class_urls = rec['class_url'].split('&')
rec_pool = ThreadPoolExecutor(max_workers=max_workers or len(class_names)) # 初始化线程池内线程数量为分类数量
tasks = []
for i in range(len(class_names)):
type_name = class_names[i]
type_id = class_urls[i]
tasks.append(rec_pool.submit(check_class, api_url, type_name, type_id))
rec_pool.shutdown(wait=True) # 线程数等待所有线程结束,这里 卡住主线程
results = [task.result() for task in tasks]
print(results)
for i in range(len(class_names)):
type_name = class_names[i]
# type_id = class_urls[i]
if not results[i]:
cate_excludes.append(type_name)
if len(cate_excludes) > 0:
new_rec['cate_excludes'] = cate_excludes
new_records.append(new_rec)
with open(file_path, mode='w+', encoding='utf-8') as f:
f.write(json.dumps(new_records, ensure_ascii=False, indent=2))
if __name__ == '__main__':
use_gzip = True
fmode = str(input('请输入处理文件方式(0:生成分类 1:添加分类过滤),留空默认为生成静态分类:\n'))
ftips = '采集静态' if fmode == '1' else '采集'
fname = str(input(f'请输入文件名(q结束程序),留空默认为{ftips}:\n'))
t1 = time.time()
if fname == 'q':
exit('已主动结束脚本')
if not fmode or fmode == '0':
fname = fname or '采集'
main(fname)
elif fmode == '1':
fname = fname or '采集静态'
main_exclude(fname, 10)
else:
exit(f'未知的处理类型:{fmode}')
t2 = time.time()
print(f'本次程序运行耗时:{round(t2 - t1, 2)}')