mirror of
https://github.com/letian1650/N3RD.git
synced 2025-01-25 04:05:17 +08:00
276 lines
9.6 KiB
Python
276 lines
9.6 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
# File : htmlParser.py
|
||
# Author: DaShenHan&道长-----先苦后甜,任凭晚风拂柳颜------
|
||
# Date : 2022/8/25
|
||
# upDate : 2022/11/17 支持 -- 剔除元素 多个剔除
|
||
|
||
import ujson
|
||
from pyquery import PyQuery as pq
|
||
from urllib.parse import urljoin
|
||
import re
|
||
from jsonpath import jsonpath
|
||
|
||
PARSE_CACHE = True # 解析缓存
|
||
NOADD_INDEX = ':eq|:lt|:gt|:first|:last|^body$|^#' # 不自动加eq下标索引
|
||
URLJOIN_ATTR = '(url|src|href|-original|-src|-play|-url|style)$' # 需要自动urljoin的属性
|
||
SPECIAL_URL = '^(ftp|magnet|thunder|ws):' # 过滤特殊链接,不走urlJoin
|
||
|
||
|
||
class jsoup:
|
||
def __init__(self, MY_URL=''):
|
||
self.MY_URL = MY_URL
|
||
self.pdfh_html = ''
|
||
self.pdfa_html = ''
|
||
|
||
self.pdfh_doc = None
|
||
self.pdfa_doc = None
|
||
|
||
def test(self, text: str, string: str):
|
||
"""
|
||
正则判断字符串包含,模仿js的 //.test()
|
||
:param text:
|
||
:param string:
|
||
:return:
|
||
"""
|
||
searchObj = re.search(rf'{text}', string, re.M | re.I)
|
||
test_ret = True if searchObj else False
|
||
return test_ret
|
||
|
||
def contains(self, text: str, match: str):
|
||
# return match in text
|
||
return text.find(match) > -1
|
||
|
||
def parseHikerToJq(self, parse, first=False):
|
||
"""
|
||
海阔解析表达式转原生表达式,自动补eq,如果传了first就最后一个也取eq(0)
|
||
:param parse:
|
||
:param first:
|
||
:return:
|
||
"""
|
||
if self.contains(parse, '&&'):
|
||
parse = parse.split('&&') # 带&&的重新拼接
|
||
new_parses = [] # 构造新的解析表达式列表
|
||
for i in range(len(parse)):
|
||
ps = parse[i].split(' ')[-1] # 如果分割&&后带空格就取最后一个元素
|
||
if not self.test(NOADD_INDEX, ps):
|
||
if not first and i >= len(parse) - 1: # 不传first且遇到最后一个,不用补eq(0)
|
||
new_parses.append(parse[i])
|
||
else:
|
||
new_parses.append(f'{parse[i]}:eq(0)')
|
||
else:
|
||
new_parses.append(parse[i])
|
||
parse = ' '.join(new_parses)
|
||
else:
|
||
ps = parse.split(' ')[-1] # 如果带空格就取最后一个元素
|
||
if not self.test(NOADD_INDEX, ps) and first:
|
||
parse = f'{parse}:eq(0)'
|
||
|
||
return parse
|
||
|
||
def getParseInfo(self, nparse):
|
||
"""
|
||
根据传入的单规则获取 parse规则,索引位置,排除列表 -- 可以用于剔除元素,支持多个,按标签剔除,按id剔除等操作
|
||
:param nparse:
|
||
:return:
|
||
"""
|
||
excludes = [] # 定义排除列表默认值为空
|
||
nparse_index = 0 # 定义位置索引默认值为0
|
||
nparse_rule = nparse # 定义规则默认值为本身
|
||
if self.contains(nparse, ':eq'):
|
||
nparse_rule = nparse.split(':eq')[0]
|
||
nparse_pos = nparse.split(':eq')[1]
|
||
# print(nparse_rule)
|
||
if self.contains(nparse_rule, '--'):
|
||
excludes = nparse_rule.split('--')[1:]
|
||
nparse_rule = nparse_rule.split('--')[0]
|
||
elif self.contains(nparse_pos, '--'):
|
||
excludes = nparse_pos.split('--')[1:]
|
||
nparse_pos = nparse_pos.split('--')[0]
|
||
try:
|
||
nparse_index = int(nparse_pos.split('(')[1].split(')')[0])
|
||
except:
|
||
pass
|
||
|
||
elif self.contains(nparse, '--'):
|
||
nparse_rule = nparse.split('--')[0]
|
||
excludes = nparse.split('--')[1:]
|
||
|
||
# if nparse_index > 0:
|
||
# print(f'nparse_rule:{nparse_rule},nparse_index:{nparse_index},excludes:{excludes}')
|
||
return nparse_rule, nparse_index, excludes
|
||
|
||
def parseOneRule(self, doc, nparse, ret=None):
|
||
"""
|
||
解析空格分割后的原生表达式中的一条记录,正确处理eq的索引,返回处理后的ret
|
||
:param doc: pq(html) load 后的pq对象
|
||
:param nparse: 当前单个解析表达式
|
||
:param ret: pd对象结果
|
||
:return:
|
||
"""
|
||
nparse_rule, nparse_index, excludes = self.getParseInfo(nparse)
|
||
|
||
if not ret:
|
||
ret = doc(nparse_rule)
|
||
else:
|
||
ret = ret(nparse_rule)
|
||
# print(f'nparse_rule:{nparse_rule},nparse_index:{nparse_index},excludes:{excludes},ret:{ret}')
|
||
if self.contains(nparse, ':eq'):
|
||
ret = ret.eq(nparse_index)
|
||
# if nparse_index > 4:
|
||
# print('nparse_index',ret,not ret)
|
||
|
||
if excludes and ret:
|
||
# print(excludes)
|
||
ret = ret.clone() # 克隆一个,免得直接remove会影响doc的缓存
|
||
for exclude in excludes:
|
||
# ret.remove(exclude)
|
||
ret(exclude).remove()
|
||
return ret
|
||
|
||
def pdfa(self, html, parse: str):
|
||
# 看官方文档才能解决这个问题!!!
|
||
# https://pyquery.readthedocs.io/en/latest/api.html
|
||
if not all([html, parse]):
|
||
return []
|
||
parse = self.parseHikerToJq(parse)
|
||
print(f'pdfa:{parse}')
|
||
if PARSE_CACHE:
|
||
if self.pdfa_html != html:
|
||
self.pdfa_html = html
|
||
self.pdfa_doc = pq(html)
|
||
doc = self.pdfa_doc
|
||
else:
|
||
doc = pq(html)
|
||
|
||
parses = parse.split(' ')
|
||
# print(parses)
|
||
ret = None
|
||
for nparse in parses:
|
||
ret = self.parseOneRule(doc, nparse, ret)
|
||
if not ret: # 可能循环取值后ret 对应eq取完无值了,pdfa直接返回空列表
|
||
return []
|
||
res = [item.outerHtml() for item in ret.items()]
|
||
return res
|
||
|
||
def pdfh(self, html, parse: str, base_url: str = ''):
|
||
if not all([html, parse]):
|
||
return ''
|
||
if PARSE_CACHE:
|
||
if self.pdfh_html != html:
|
||
self.pdfh_html = html
|
||
self.pdfh_doc = pq(html)
|
||
doc = self.pdfh_doc
|
||
else:
|
||
doc = pq(html)
|
||
if parse == 'body&&Text' or parse == 'Text':
|
||
text = doc.text()
|
||
return text
|
||
elif parse == 'body&&Html' or parse == 'Html':
|
||
return doc.html()
|
||
|
||
option = None
|
||
if self.contains(parse, '&&'):
|
||
option = parse.split('&&')[-1]
|
||
parse = '&&'.join(parse.split('&&')[:-1])
|
||
parse = self.parseHikerToJq(parse, True)
|
||
# print(f'pdfh:{parse},option:{option}')
|
||
parses = parse.split(' ')
|
||
# print(parses)
|
||
ret = None
|
||
for nparse in parses:
|
||
ret = self.parseOneRule(doc, nparse, ret)
|
||
# print(nparse,ret)
|
||
if not ret: # 可能循环取值后ret 对应eq取完无值了,pdfh直接返回空字符串
|
||
return ''
|
||
|
||
if option:
|
||
if option == 'Text':
|
||
ret = ret.text()
|
||
elif option == 'Html':
|
||
ret = ret.html()
|
||
else:
|
||
ret = ret.attr(option) or ''
|
||
if self.contains(option.lower(), 'style') and self.contains(ret, 'url('):
|
||
try:
|
||
ret = re.search('url\((.*?)\)', ret, re.M | re.S).groups()[0]
|
||
# 2023/07/28新增 style取内部链接自动去除首尾单双引号
|
||
ret = re.sub(r"^['\"]|['\"]$", '', ret)
|
||
except:
|
||
pass
|
||
if ret and base_url:
|
||
# need_add = re.search(URLJOIN_ATTR, option, re.M | re.I)
|
||
need_add = self.test(URLJOIN_ATTR, option) and not self.test(SPECIAL_URL, ret)
|
||
if need_add:
|
||
if 'http' in ret:
|
||
ret = ret[ret.find('http'):]
|
||
else:
|
||
ret = urljoin(base_url, ret)
|
||
else:
|
||
ret = ret.outerHtml()
|
||
return ret
|
||
|
||
def pd(self, html, parse: str, base_url: str = ''):
|
||
if not base_url:
|
||
base_url = self.MY_URL
|
||
return self.pdfh(html, parse, base_url)
|
||
|
||
def pq(self, html: str):
|
||
return pq(html)
|
||
|
||
def pjfh(self, html, parse: str, add_url=False):
|
||
if not all([html, parse]):
|
||
return ''
|
||
if isinstance(html, str):
|
||
# print(html)
|
||
try:
|
||
html = ujson.loads(html)
|
||
# html = eval(html)
|
||
except:
|
||
print('字符串转json失败')
|
||
return ''
|
||
if not parse.startswith('$.'):
|
||
parse = f'$.{parse}'
|
||
ret = ''
|
||
for ps in parse.split('||'):
|
||
ret = jsonpath(html, ps)
|
||
if isinstance(ret, list):
|
||
ret = str(ret[0]) if ret[0] else ''
|
||
else:
|
||
ret = str(ret) if ret else ''
|
||
if add_url and ret:
|
||
ret = urljoin(self.MY_URL, ret)
|
||
if ret:
|
||
break
|
||
# print(ret)
|
||
return ret
|
||
|
||
def pj(self, html, parse: str):
|
||
return self.pjfh(html, parse, True)
|
||
|
||
def pjfa(self, html, parse: str):
|
||
if not all([html, parse]):
|
||
return []
|
||
if isinstance(html, str):
|
||
try:
|
||
html = ujson.loads(html)
|
||
except:
|
||
return []
|
||
if not parse.startswith('$.'):
|
||
parse = f'$.{parse}'
|
||
# print(html)
|
||
# print(parse)
|
||
ret = jsonpath(html, parse)
|
||
# print(ret)
|
||
# print(type(ret))
|
||
# print(type(ret[0]))
|
||
# print(len(ret))
|
||
if isinstance(ret, list) and isinstance(ret[0], list) and len(ret) == 1:
|
||
# print('自动解包')
|
||
ret = ret[0] # 自动解包
|
||
return ret or []
|
||
|
||
|
||
if __name__ == '__main__':
|
||
pass
|