提交 437022ac 作者: XveLingKun

0823

上级 8eb77a4e
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="PublishConfigData" remoteFilesAllowedToDisappearOnAutoupload="false">
<serverData>
<paths name="root@114.115.141.81:22 (2)">
<serverdata>
<mappings>
<mapping local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
<paths name="root@114.116.36.231:22">
<serverdata>
<mappings>
<mapping local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
<paths name="root@114.116.54.108:22">
<serverdata>
<mappings>
<mapping local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
<paths name="zzsn@116.63.135.219:22">
<serverdata>
<mappings>
<mapping local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
<paths name="zzsn@116.63.179.212:22">
<serverdata>
<mappings>
<mapping local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
<paths name="zzsn@116.63.179.212:22 password">
<serverdata>
<mappings>
<mapping local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
<paths name="zzsn@192.168.1.149:22">
<serverdata>
<mappings>
<mapping local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
<paths name="zzsn@192.168.1.149:22 (2)">
<serverdata>
<mappings>
<mapping local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
<paths name="zzsn@192.168.1.149:22 (3)">
<serverdata>
<mappings>
<mapping local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
</serverData>
</component>
</project>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.8" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/zzsn.iml" filepath="$PROJECT_DIR$/.idea/zzsn.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
</module>
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
...@@ -48,53 +48,56 @@ def shiftwindow(n): ...@@ -48,53 +48,56 @@ def shiftwindow(n):
def click(type_name, driver): def click(type_name, driver):
# 右键选择翻译 # 右键选择翻译
pyautogui.moveTo(500, 400, duration=1) # pyautogui.moveTo(500, 400, duration=1)
if type_name == '正文': # if type_name == '正文':
try: # try:
if driver.find_element(By.TAG_NAME, 'img').is_displayed(): # if driver.find_element(By.TAG_NAME, 'img').is_displayed():
pass # pass
else: # else:
time.sleep(5) # time.sleep(5)
except: # except:
pass # pass
time.sleep(1) # time.sleep(1)
position_elements = driver.find_elements(By.TAG_NAME, 'p') # position_elements = driver.find_elements(By.TAG_NAME, 'p')
for e in position_elements: # for e in position_elements:
driver.execute_script("arguments[0].scrollIntoView();", e) # driver.execute_script("arguments[0].scrollIntoView();", e)
time.sleep(1) # time.sleep(1)
try: # try:
if e.find_element(By.TAG_NAME, 'a'): # if e.find_element(By.TAG_NAME, 'a'):
continue # continue
else: # else:
break # break
except: # except:
#
break # break
else: # else:
pyautogui.moveTo(1500, 900, duration=1) # pyautogui.moveTo(1500, 900, duration=1)
# log.error(f'{type_name}----未找到可点击的元素') # # log.error(f'{type_name}----未找到可点击的元素')
# return None # # return None
else: # else:
time.sleep(1) # time.sleep(1)
position_elements = driver.find_elements(By.TAG_NAME, 'div') # position_elements = driver.find_elements(By.TAG_NAME, 'div')
for e in position_elements: # for e in position_elements:
try: # try:
if e.find_element(By.TAG_NAME, 'a'): # if e.find_element(By.TAG_NAME, 'a'):
continue # continue
else: # else:
break # break
except: # except:
#
break # break
else: # else:
log.error(f'{type_name}----未找到可点击的元素') # log.error(f'{type_name}----未找到可点击的元素')
return None # return None
rightClick = ActionChains(driver) # rightClick = ActionChains(driver)
try: try:
rightClick.context_click(e).perform() # rightClick.context_click(e).perform()
#右键点击快捷键
pyautogui.hotkey('shift', 'f10')
except: except:
rightClick.context_click().perform() # rightClick.context_click().perform()
pyautogui.hotkey('shift', 'f10')
@retry(tries=3, delay=1) @retry(tries=3, delay=1)
...@@ -126,17 +129,19 @@ def Translate(type_name, file_name, driver): ...@@ -126,17 +129,19 @@ def Translate(type_name, file_name, driver):
driver.refresh() driver.refresh()
click(type_name, driver) click(type_name, driver)
time.sleep(1) time.sleep(1)
pyautogui.typewrite(['down'] * 6) # pyautogui.typewrite(['down'] * 6)
pyautogui.typewrite(["enter"]) # pyautogui.typewrite(["enter"])
# 键盘快捷键翻译 T
pyautogui.hotkey('t')
time.sleep(1) time.sleep(1)
js = "window.scrollTo(0,0)" js = "window.scrollTo(0,0)"
driver.execute_script(js) driver.execute_script(js)
time.sleep(1) time.sleep(1)
count_ = 0 count_ = 0
while driver.find_element(By.TAG_NAME, 'body').text[:500] in flag and count_ < 10: while driver.find_element(By.TAG_NAME, 'body').text[:1000] == flag and count_ < 10:
time.sleep(2) time.sleep(2)
count_ += 1 count_ += 1
if driver.find_element(By.TAG_NAME, 'body').text[:500] in flag: if driver.find_element(By.TAG_NAME, 'body').text[:1000] == flag:
log.error(f'{type_name}---翻译加载失败') log.error(f'{type_name}---翻译加载失败')
return None return None
......
...@@ -499,11 +499,12 @@ def doJob(): ...@@ -499,11 +499,12 @@ def doJob():
continue continue
else: else:
pass pass
# response = requests.post('http://114.115.236.206:8088/sync/executive', data=json_updata, timeout=300, response = requests.post('http://114.115.236.206:8088/sync/executive', data=json_updata, timeout=300,
# verify=False) verify=False)
response = requests.post('http://114.115.218.248:9292/userserver/sync/executive', data=json_updata, timeout=300, response_ = requests.post('http://114.116.116.241:9098/userserver/sync/executive', data=json_updata, timeout=300,
verify=False) verify=False)
print(response.text) print(response.text)
print(response_.text)
log.info('=========成功======') log.info('=========成功======')
token.updateTokeen(id_cookie, 3) token.updateTokeen(id_cookie, 3)
# time.sleep(randint(5,10)) # time.sleep(randint(5,10))
......
...@@ -179,6 +179,23 @@ class Info(): ...@@ -179,6 +179,23 @@ class Info():
print(result) print(result)
pass pass
from selenium import webdriver
class Driver():
def create_driver(self):
path = r'D:\soft\msedgedriver.exe'
# options = webdriver.EdgeOptions()
options = {
"browserName": "MicrosoftEdge",
"ms:edgeOptions": {
"extensions": [], "args": ["--start-maximized"] # 添加最大化窗口运作参数
}
}
session = webdriver.Edge(executable_path=path, capabilities=options)
return session
if __name__ == '__main__': if __name__ == '__main__':
# token = Token() # token = Token()
# print(token.get_cookies()) # print(token.get_cookies())
......
This source diff could not be displayed because it is too large. You can view the blob instead.
...@@ -13,6 +13,7 @@ import requests ...@@ -13,6 +13,7 @@ import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from kafka import KafkaProducer from kafka import KafkaProducer
from retry import retry from retry import retry
from selenium.webdriver.common.by import By
from base import BaseCore from base import BaseCore
from obs import ObsClient from obs import ObsClient
...@@ -376,6 +377,21 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na ...@@ -376,6 +377,21 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na
log.info(dic_result) log.info(dic_result)
return False return False
from selenium import webdriver
def create_driver():
path = r'D:\soft\msedgedriver.exe'
# options = webdriver.EdgeOptions()
options = {
"browserName": "MicrosoftEdge",
"ms:edgeOptions": {
"extensions": [], "args": ["--start-maximized"] # 添加最大化窗口运作参数
}
}
session = webdriver.Edge(executable_path=path, capabilities=options)
return session
# 采集信息 # 采集信息
def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库中获取到的基本信息 def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库中获取到的基本信息
social_code = dic_info[2] social_code = dic_info[2]
...@@ -488,6 +504,13 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库 ...@@ -488,6 +504,13 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库
#如果不存在 ifexist = True #如果不存在 ifexist = True
# ifexist = True # ifexist = True
if ifexist: if ifexist:
#todo:判断pdfurl是否请求成功
driver.get(pdf_url)
page_source = driver.page_source
if "访问验证" in page_source:
driver.find_element(By.XPATH, "/html/body/div[1]/div[1]/div/div[2]/div/div/div/div/span")
# 解析PDF内容,先获取PDF链接 下载 解析成功,解析失败 ,传输成功,传输失败 # 解析PDF内容,先获取PDF链接 下载 解析成功,解析失败 ,传输成功,传输失败
result = GetContent(pdf_url, name_pdf, social_code, year, pub_time, start_time,com_name,num) result = GetContent(pdf_url, name_pdf, social_code, year, pub_time, start_time,com_name,num)
...@@ -541,6 +564,7 @@ if __name__ == '__main__': ...@@ -541,6 +564,7 @@ if __name__ == '__main__':
'Upgrade-Insecure-Requests': '1', 'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36' 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36'
} }
driver = create_driver()
dic_parms = {} dic_parms = {}
# 读取数据库获取股票代码 简称 以及 社会信用代码 # 读取数据库获取股票代码 简称 以及 社会信用代码
...@@ -592,7 +616,7 @@ if __name__ == '__main__': ...@@ -592,7 +616,7 @@ if __name__ == '__main__':
count += 1 count += 1
runType = 'NoticeReportCount' runType = 'NoticeReportCount'
baseCore.updateRun(social_code, runType, count) baseCore.updateRun(social_code, runType, count)
break # break
cursor.close() cursor.close()
cnx.close() cnx.close()
baseCore.close() baseCore.close()
...@@ -34,35 +34,46 @@ def updatewxLink(link,info_source_code,state): ...@@ -34,35 +34,46 @@ def updatewxLink(link,info_source_code,state):
def getjsonInfo(): def getjsonInfo():
# todo:从redis中获取一条 # todo:从redis中获取一条
linkid = baseCore.redicPullData('WeiXinGZH:linkid') # linkid = baseCore.redicPullData('WeiXinGZH:linkid')
if linkid: # 测试使用
pass linkid = True
else: # if linkid:
log.info('-----没有数据了-----') # pass
return False, False # else:
#从数据库中获取信息 一条 # log.info('-----没有数据了-----')
select_sql = f"select * from wx_link where state=0 and id= '{linkid}'" # return False, False
cursor_.execute(select_sql) # #从数据库中获取信息 一条
row = cursor_.fetchone() # select_sql = f"select * from wx_link where state=0 and id= '{linkid}'"
cnx_.commit() # cursor_.execute(select_sql)
if row: # row = cursor_.fetchone()
pass # cnx_.commit()
else: # if row:
log.info('-----没有数据了-----') # pass
return False, False # else:
# log.info('-----没有数据了-----')
# return False, False
# dict_json = {
# 'sid':row[1],
# 'site_uri':row[2],
# 'site_name':row[3],
# 'info_source_code':row[4],
# 'title':row[5],
# 'publish_time':row[6],
# 'link':row[7]
# }
dict_json = { dict_json = {
'sid':row[1], 'sid': 111,
'site_uri':row[2], 'site_uri': "",
'site_name':row[3], 'site_name': "",
'info_source_code':row[4], 'info_source_code': "",
'title':row[5], 'title': "测试",
'publish_time':row[6], 'publish_time': "",
'link':row[7] 'link': "https://mp.weixin.qq.com/s?__biz=MjM5NDgyMTI2NQ==&mid=2648261170&idx=3&sn=13c1bb102bdaa16b1d5e6eda2c189d0f&chksm=beac07cc89db8eda0a134dc9811d0c12c6a58ea4fbd6514f51ea685abd5cee748064aec6e650&token=1828158456&lang=zh_CN#rd"
} }
# 拿到一条数据 更新状态 # # 拿到一条数据 更新状态
update_sql = f"update wx_link set state=1 where link='{row[7]}' and info_source_code='{row[4]}' " # update_sql = f"update wx_link set state=1 where link='{row[7]}' and info_source_code='{row[4]}' "
cursor_.execute(update_sql) # cursor_.execute(update_sql)
cnx_.commit() # cnx_.commit()
return dict_json, linkid return dict_json, linkid
@retry(tries=20, delay=2) @retry(tries=20, delay=2)
...@@ -215,7 +226,7 @@ def get_info(dict_json, linkid): ...@@ -215,7 +226,7 @@ def get_info(dict_json, linkid):
list_section = news_html.find_all('section') list_section = news_html.find_all('section')
for section in list_section: for section in list_section:
section.name = 'div' section.name = 'div'
news_html = deletek(news_html)
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_info = { dic_info = {
'sid': sid, 'sid': sid,
...@@ -293,6 +304,42 @@ def rm_style_attr(soup): ...@@ -293,6 +304,42 @@ def rm_style_attr(soup):
return soup return soup
# def deletek(soup):
# # 删除空白标签(例如<p></p>、<p><br></p>, img、video、br除外)
# for i in soup.find_all(lambda tag: len(tag.get_text()) == 0 and tag.name not in ["img", "video", "br"] and tag.name != "br" or tag.get_text()==' '):
# for j in i.descendants:
# if j.name in ["img", "video", "br"]:
# break
# else:
# i.decompose()
#
# return soup
def deletek(soup):
# 删除空白标签(例如<p></p>、<p><br></p>, img、video、br除外)
for i in soup.find_all(lambda tag: len(tag.get_text(strip=True)) == 0 and tag.name not in ["img", "video"]):
if i.name == "p":
# 检查 <p> 标签内是否只包含 <br>
only_br = True
for child in i.children:
if child.name and child.name != "br":
only_br = False
break
if only_br:
i.decompose()
else:
# 检查标签是否包含 img 或 video 子标签
contains_img_or_video = False
for child in i.descendants:
if child.name in ["img", "video"]:
contains_img_or_video = True
break
if not contains_img_or_video:
i.decompose()
return soup
if __name__=="__main__": if __name__=="__main__":
num_caiji = 0 num_caiji = 0
list_all_info = [] list_all_info = []
......
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
//<![CDATA[
//<![CDATA[
__wm.bt(725,27,25,2,"web","https://www.forbes.com/lists/global2000/","20220929184024",1996,"https://web-static.archive.org/_static/",["https://web-static.archive.org/_static/css/banner-styles.css?v=S1zqJCYt","https://web-static.archive.org/_static/css/iconochive.css?v=qtvMKcIJ"], false);
__wm.rw(1);
//]]>
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
import asyncio
import time
import playwright
from playwright.async_api import async_playwright
from urllib.parse import urlparse, parse_qs
from base import BaseCore
baseCore = BaseCore.BaseCore(sqlflg=False)
log = baseCore.getLogger()
webid = None
msToken = None
cookies = None
def handle_request(request):
url = request.url
if url.startswith('https://www.douyin.com/aweme/v1/web/user/profile/other/'):
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
global webid
webid = query_params['webid'][0]
async def login(page):
await page.wait_for_selector('.web-login-tab-list__item')
await page.click('.web-login-tab-list__item')
await page.fill('.web-login-normal-input__input', '16603863075')
await page.fill('.web-login-button-input__input', 'liu1230...')
await page.click('.web-login-button')
# 每隔1秒判断webid是否拿到
async def check_webid():
while True:
if webid is not None:
break
await asyncio.sleep(1)
async def get_ttwid_and_webid():
url = 'https://www.douyin.com/user/MS4wLjABAAAAEpmH344CkCw2M58T33Q8TuFpdvJsOyaZcbWxAMc6H03wOVFf1Ow4mPP94TDUS4Us'
# url = 'https://www.douyin.com/'
headless = False
log.info('如出现验证过程,请手动验证')
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=headless,
args=[
'--disable-blink-features=AutomationControlled',
],
channel='chrome'
)
page = await browser.new_page()
page.set_default_timeout(90000)
page.on("request", lambda request: handle_request(request=request))
await page.goto(url)
await page.wait_for_selector('.web-login-tab-list__item')
buttons = await page.query_selector_all('.web-login-tab-list__item')
await buttons[2].click()
await page.fill('.web-login-normal-input__input', '13273737131')
await page.fill('.web-login-button-input__input', 'liu1230...')
await page.click('.web-login-button')
await asyncio.sleep(3)
while True:
try:
flg = await page.locator('.web-login-tab-list__item').count()
log.info(flg)
if flg:
await page.wait_for_timeout(10)
await asyncio.sleep(3)
continue
else:
break
except Exception as e:
log.info(e)
await page.wait_for_load_state('load')
await asyncio.sleep(3)
await check_webid()
page_cookies = await page.context.cookies()
await browser.close()
global cookies
cookies = {}
for cookie in page_cookies:
cookies[cookie['name']] = cookie['value']
if cookie['name'] == 'msToken':
global msToken
msToken = cookie['value']
def get_new_cookies():
asyncio.run(get_ttwid_and_webid())
return {
'webid': webid,
'msToken': msToken,
'cookies': cookies,
}
if __name__ == '__main__':
print(get_new_cookies())
import asyncio
import getpass
import os
import re
import aiofiles
from playwright.async_api import async_playwright
from urllib.parse import urlparse
from base import BaseCore
baseCore = BaseCore.BaseCore(sqlflg=False)
log = baseCore.getLogger()
def check_path(path):
if not os.path.exists(path):
return False
return True
def get_socket_template():
return """
const socket_dy = new WebSocket('ws://localhost:port');
socket_dy.onopen = (event) => {
console.log('WebSocket connection opened:', event);
};
socket_dy.onmessage = (event) => {
console.log('Received message:', event.data);
};
socket_dy.onclose = (event) => {
console.log('WebSocket connection closed:', event);
};
function sendMessage(res_dy) {
socket_dy.send(JSON.stringify(res_dy));
};
"""
def get_emit_template():
return """
,window.res_dy = replacement_1,
!function () {
if(typeof(replacement_1[0]) == "string"){
return
}
console.log(window.res_dy);
if(replacement_2 == "#sync#ChatMessage" || et== "#sync#GiftMessage"){
sendMessage(window.res_dy);
}
}()
"""
js_dict = None
pattern_1 = re.compile('null==\(.*?=this\._debug\)\|\|.*?\.call\(this,.*?,\.\.\.(.*?)\),this')
pattern_2 = re.compile('\.call\(this,"emit_error",(.*?),(.*?)\)')
async def handle_response(response):
url = response.url
if url.endswith('.js'):
js_text = await response.text()
if 'emit_error' in js_text:
js_name = urlparse(url).path.split('/')[-1]
variable_1 = re.findall(pattern_2, js_text)[0][0]
variable_2 = re.findall(pattern_2, js_text)[0][1]
replacement = re.search(pattern_1, js_text).group()
emit_template = get_emit_template()
insert = emit_template.replace('replacement_1', variable_1).replace('replacement_2', variable_2)
insert = replacement + insert
js_text = js_text.replace(replacement, insert)
global js_dict
js_dict = {
'name': js_name,
'text': js_text
}
async def check_js():
while True:
if js_dict is not None:
break
await asyncio.sleep(1)
async def getJS():
url = 'https://live.douyin.com/567973675942'
USER_DIR_PATH = f"C:\\Users\\{getpass.getuser()}\\AppData\Local\Google\Chrome\\User Data"
headless = True
async with async_playwright() as p:
browser = await p.chromium.launch_persistent_context(
user_data_dir=USER_DIR_PATH,
headless=headless,
args=[
'--disable-blink-features=AutomationControlled',
],
channel='chrome'
)
page = await browser.new_page()
page.on("response", lambda request: handle_response(response=request))
await page.goto(url)
await check_js()
await browser.close()
async def get_new_JS(port=9999):
await getJS()
socket_template = get_socket_template()
socket_template = socket_template.replace('port', str(port))
js_dict['text'] = socket_template + js_dict['text']
return js_dict
async def save_new_JS(port=9999):
js_dict = await get_new_JS(port)
path = f'../static/{js_dict["name"]}'
exist = check_path(path)
if not exist:
async with aiofiles.open(path, 'w', encoding='utf-8') as f:
await f.write(js_dict['text'])
log.info(f'抖音JS文件更新,已保存到{path}')
else:
log.info(f'抖音JS文件无需更新')
return js_dict["name"], path
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(save_new_JS())
'''获取用户下视频'''
import datetime
import json
import time
import pymysql
import redis
import requests
from kafka import KafkaProducer
from retry import retry
from dy_utils.dy_util import norm_str, get_headers, get_list_params, splice_url, handle_list_video_info_each, js, check_info, download_image, download_video
from profile import Profile
from base import BaseCore
baseCore = BaseCore.BaseCore(sqlflg=False)
log = baseCore.getLogger()
class Home:
def __init__(self, info=None):
if info is None:
self.info = check_info()
else:
self.info = info
self.profile = Profile(self.info)
self.list_url = "https://www.douyin.com/aweme/v1/web/aweme/post/"
self.headers = get_headers()
self.cnx = pymysql.connect(host='192.168.12.114', port=3306, user='root', password='@zzsn9988', db='clb_project', charset='utf8mb4')
self.cursor = self.cnx.cursor()
self.r = redis.Redis(host="192.168.12.114", port=6379, password='zzsn9988', db=3)
# 更新作品附加属性
@retry(tries=2, delay=5)
def postData(self, video, sid):
dic_info = {
'sid': sid, # 信息源sid
'author': video.nickname, # 作者
'articleId': f'douyin{video.awemeId}', # 作品唯一id
'likeNum': int(video.digg_count), # 点赞数
'collectNum': int(video.collect_count), # 收藏数
'shareNum': int(video.share_count), # 转发数
'commentNum': int(video.comment_count), # 评论数
}
headers = {
'Content-Type': 'application/json;charset=utf-8',
}
dataJson = json.dumps(dic_info)
req = requests.post('http://192.168.12.111:9988/serviceProject/anon/api/updateZpStatistics',
data=dataJson, timeout=300, verify=False,
headers=headers)
if req.status_code == 200:
log.info(f"{dic_info['articleId']}数据更新成功")
else:
log.info(req.status_code)
log.error(f"{dic_info['articleId']}数据更新失败")
# 判重,第二次采集只发送附加属性
def is_member_containing_string(self, set_name, string):
cursor = '0'
while True:
# 使用 SCAN 命令遍历 Set 列表
cursor, members = self.r.sscan(f'douyinSpider:{set_name}', cursor)
for member in members:
# 判断字符串是否包含指定字符串
if string in member.decode("utf-8"):
return True
if cursor == b'0' or cursor == 0:
break
return False
# 发送kafka
def sendKafka(self, dic_news):
try:
producer = KafkaProducer(bootstrap_servers=['192.168.12.114:9092'], sasl_plain_username='zzsn',
sasl_plain_password='@zzsn9988', sasl_mechanism="PLAIN",
security_protocol='SASL_PLAINTEXT')
kafka_result = producer.send("crawlerInfo",
json.dumps(dic_news, ensure_ascii=False).encode('utf-8'))
# print(kafka_result.get(timeout=10))
log.info(f"{dic_news['sourceAddress']}===传输成功")
return True
except Exception as e:
log.error(f"{dic_news['sourceAddress']}===传输失败==={e}")
return False
# 主页
def save_all_video_info(self, url, sid):
# 保存用户基本信息
userId = url.split('/user/')[1]
profile = self.profile.get_profile_info(url)
# 用户名
nickname = norm_str(profile.nickname)
sec_user_id = profile.sec_uid
max_cursor = '0'
timeFlg = 0
while True:
params = get_list_params()
params['webid'] = self.info['webid']
params['msToken'] = self.info['msToken']
params['max_cursor'] = max_cursor
params['sec_user_id'] = sec_user_id
splice_url_str = splice_url(params)
xs = js.call('get_dy_xb', splice_url_str)
params['X-Bogus'] = xs
post_url = self.list_url + '?' + splice_url(params)
response = requests.get(post_url, headers=self.headers, cookies=self.info['cookies'])
res_json = response.json()
max_cursor = str(res_json['max_cursor'])
has_more = res_json['has_more']
# 判断是否还有更多
# has_more 为0 时 数据为最后一页数据
for item in res_json['aweme_list']:
video_detail = handle_list_video_info_each(item)
if video_detail.upload_time < '2023-08-01 00:00:00':
timeFlg += 1
continue
if self.is_member_containing_string(userId, video_detail.href):
self.postData(video_detail, sid)
continue
else:
self.save_videos_info(video_detail, sid, nickname, userId)
time.sleep(3)
# 抖音置顶只有三个
if timeFlg > 4:
break
if has_more == 0:
break
log.info(f'用户 {profile.nickname} 全部视频信息保存成功')
# 工具类,用于保存信息
def save_videos_info(self, video, sid, userName, userId):
try:
title = norm_str(video.title)
# if 用于下载图文
if len(video.images) > 0:
attachmentIds, contentWithTag = download_image(video, userId, self.cnx)
if not contentWithTag:
log.info('采集失败')
return ''
content = video.preview_title
downLoadUrl = ''
videoUrl = ''
videoTime = ''
videoImg = ''
# 下载视频封面、视频
else:
attachmentIds = None
content = None
contentWithTag = None
videoUrl = video.video_addr
videoTime = video.video_time
downLoadUrl, videoImg = download_video(video, userId, self.cnx)
if not downLoadUrl:
log.info('采集失败')
return ''
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
dic_info = {
'sid': sid, # 信息源sid
'title': title, # 标题
'source': 24,
'publishDate': video.upload_time,
'sourceAddress': video.href,
'origin': '抖音',
'createDate': now,
'author': userName,
'lang': 'zh',
'deleteFlag': 0,
'content': content,
'contentWithTag': str(contentWithTag),
'articleId': f'douyin{video.awemeId}', # 作品唯一id
'likeNum': video.digg_count, # 点赞数
'collectNum': video.collect_count, # 收藏数
'shareNum': video.share_count, # 转发数
'commentNum': video.comment_count, # 评论数
'downLoadUrl': downLoadUrl, # 视频minio链接
'videoUrl': videoUrl, # 视频原链接
'videoTime': videoTime, # 视频时长
'videoImg': videoImg, # 视频第一帧图片
'attachmentIds': attachmentIds,
}
updateFlg = self.sendKafka(dic_info)
if updateFlg:
self.r.sadd(f'douyinSpider:{userId}', video.href)
log.info(f'保存成功')
log.info('============================================================================================================')
except Exception as e:
log.error(f'保存失败==={e}')
log.info('============================================================================================================')
def doJob(self, url_list, sid):
for url in url_list:
try:
self.save_all_video_info(url, sid)
except Exception as e:
log.error(f'用户 {url} 查询失败==={e}')
if __name__ == '__main__':
# home = Home()
# url_list = [
# 'https://www.douyin.com/user/MS4wLjABAAAA0HygX4vUM1_RyZZbwCAP6jlFQ9DnXO6LvnnB_2guQQg'
# ]
# home.doJob(url_list, 'aaaaa')
a = open(r'./static/dy.js', 'r', encoding='gb18030').read()
print(a)
# 'https://www.douyin.com/user/MS4wLjABAAAAigSKToDtKeC5cuZ3YsDrHfYuvpLqVSygIZ0m0yXfUAI',
# # 'https://www.douyin.com/user/MS4wLjABAAAAp2OG100fRV13HqBbRnbPM_l7DU0eTOaxgL-4_l07fQo',
class User_Detail():
def __init__(self, id, sec_uid, nickname, author_avatar, desc, following_count, follower_count, total_favorited, aweme_count, unique_id, user_age, gender, ip_location):
self.id = id
self.sec_uid = sec_uid
self.nickname = nickname
self.author_avatar = author_avatar
self.desc = desc
self.following_count = following_count
self.follower_count = follower_count
self.total_favorited = total_favorited
self.aweme_count = aweme_count
self.unique_id = unique_id
self.user_age = user_age
self.gender = gender
self.ip_location = ip_location
def __str__(self):
# 每个值都要换行
return f'id: {self.id}\n' \
f'sec_uid: {self.sec_uid}\n' \
f'nickname: {self.nickname}\n' \
f'author_avatar: {self.author_avatar}\n' \
f'desc: {self.desc}\n' \
f'following_count: {self.following_count}\n' \
f'follower_count: {self.follower_count}\n' \
f'total_favorited: {self.total_favorited}\n' \
f'aweme_count: {self.aweme_count}\n' \
f'unique_id: {self.unique_id}\n' \
f'user_age: {self.user_age}\n' \
f'gender: {self.gender}\n' \
f'ip_location: {self.ip_location}\n'
class Video_Detail():
def __init__(self, id, awemeId, sec_uid, nickname, author_avatar, video_cover, title, desc, digg_count, comment_count, collect_count, share_count, video_addr, images, upload_time, href,
preview_title, video_time):
self.id = id
self.awemeId = awemeId
self.sec_uid = sec_uid
self.nickname = nickname
self.author_avatar = author_avatar
self.video_cover = video_cover
self.title = title
self.desc = desc
self.digg_count = digg_count
self.comment_count = comment_count
self.collect_count = collect_count
self.share_count = share_count
self.video_addr = video_addr
self.images = images
self.upload_time = upload_time
self.href = href
self.preview_title = preview_title
self.video_time = video_time
def __str__(self):
# 每个值都要换行
return f'id: {self.id}\n' \
f'awemeId: {self.awemeId}\n' \
f'sec_uid: {self.sec_uid}\n' \
f'nickname: {self.nickname}\n' \
f'author_avatar: {self.author_avatar}\n' \
f'video_cover: {self.video_cover}\n' \
f'title: {self.title}\n' \
f'desc: {self.desc}\n' \
f'digg_count: {self.digg_count}\n' \
f'comment_count: {self.comment_count}\n' \
f'collect_count: {self.collect_count}\n' \
f'share_count: {self.share_count}\n' \
f'video_addr: {self.video_addr}\n' \
f'images: {self.images}\n' \
f'upload_time: {self.upload_time}\n'
'''获取用户基本信息'''
import requests
from dy_utils.dy_util import js, get_headers, get_profile_params, splice_url, handle_profile_info, check_info
class Profile:
def __init__(self, info=None):
if info is None:
self.info = check_info()
else:
self.info = info
self.headers = get_headers()
self.profile_url = "https://www.douyin.com/aweme/v1/web/user/profile/other/"
# 个人信息主页
def get_profile_info(self, url):
# 用户唯一id
sec_user_id = url.split('/')[-1]
# 请求参数
params = get_profile_params()
params['webid'] = self.info['webid']
params['msToken'] = self.info['msToken']
params['sec_user_id'] = sec_user_id
splice_url_str = splice_url(params)
xs = js.call('get_dy_xb', splice_url_str)
params['X-Bogus'] = xs
post_url = self.profile_url + '?' + splice_url(params)
response = requests.get(post_url, headers=self.headers, cookies=self.info['cookies'])
profile_json = response.json()
profile = handle_profile_info(profile_json)
return profile
This source diff could not be displayed because it is too large. You can view the blob instead.
{"webid": "7371728745382282790", "msToken": "QCcS_H7cJBoVqiIuJAik-0SGkDxL7QSakvIO25NAHOOaZ4X_Sg_ee1kb8S6NwxVYU29jI3Wbi6OLjYnS-LXavHAdnCFF8i1vKZjawwHcAsDl-ahgSuiBsZNRjC-vow==", "cookies": {"": "douyin.com", "device_web_cpu_core": "8", "device_web_memory_size": "8", "architecture": "amd64", "__ac_nonce": "0664da44b001a7749a7d5", "__ac_signature": "_02B4Z6wo00f01SPfmpgAAIDCHnaABj8vmQkj.54AAC6935", "ttwid": "1%7CKsZkClcqllTIXO0u68EG6hrcf8_wsEwhBxtRgMADE90%7C1716364364%7Cb23d4093caf7167226f4829d671a28bc7683e40b7f1f42910009de10ffa17dd9", "home_can_add_dy_2_desktop": "%220%22", "dy_swidth": "1280", "dy_sheight": "720", "csrf_session_id": "ccc3d4c6f69bc6eb6b9845f99a1da730", "FORCE_LOGIN": "%7B%22videoConsumedRemainSeconds%22%3A180%7D", "s_v_web_id": "verify_lwhj11br_BmEab2u3_Jb0i_4PM5_8jh9_CjSePwpcsgpB", "strategyABtestKey": "%221716364367.378%22", "passport_csrf_token": "b47947b7dfe896f0ea03be7bc8d224f0", "passport_csrf_token_default": "b47947b7dfe896f0ea03be7bc8d224f0", "bd_ticket_guard_client_web_domain": "2", "fg_uid": "RID202405221552512CE8995B8BCA85FC741E", "d_ticket": "858f9eeb8eabfe88e4b43775f93e88cdacd5c", "passport_assist_user": "CkDJSHfRvsBYEK3Yw8dYGwpfDn3Xj_usp8SXZQP_rPogM8pYOvbS4j5JdsVzPBkdTd8H8dhfnQuEHuDxizrxhuzLGkoKPF12huKRF4zzS86KEDNboY5SQgOICJFuaFHY0AOfJX9sFD2JJoGJOftriznTdtgdAJjHrJIV-2lqKwATRhD0_NENGImv1lQgASIBA439Ojc%3D", "n_mh": "3KhW9EaK0uD4c9FrD5FdzZcy9Ii88QbiNhZs5Ma1i6A", "sso_auth_status": "cacf16d4ad579c220e90b04477b29876", "sso_auth_status_ss": "cacf16d4ad579c220e90b04477b29876", "sso_uid_tt": "dbfc0fcdeb818ad746742705bc257e5c", "sso_uid_tt_ss": "dbfc0fcdeb818ad746742705bc257e5c", "toutiao_sso_user": "9172a2bbd9ee01b0b04398ff51655783", "toutiao_sso_user_ss": "9172a2bbd9ee01b0b04398ff51655783", "sid_ucp_sso_v1": "1.0.0-KDlmNjJkMzkxN2I1NzJkNDQwMWQyODY0YmJhMDJlMTg2MmY2Yzc0MjcKHwjj1pDk4c2sARDjyLayBhjvMSAMMP-Rh6cGOAJA8QcaAmhsIiA5MTcyYTJiYmQ5ZWUwMWIwYjA0Mzk4ZmY1MTY1NTc4Mw", "ssid_ucp_sso_v1": "1.0.0-KDlmNjJkMzkxN2I1NzJkNDQwMWQyODY0YmJhMDJlMTg2MmY2Yzc0MjcKHwjj1pDk4c2sARDjyLayBhjvMSAMMP-Rh6cGOAJA8QcaAmhsIiA5MTcyYTJiYmQ5ZWUwMWIwYjA0Mzk4ZmY1MTY1NTc4Mw", "passport_auth_status": "8e05741afb3e010e77e163fda72fb76f%2C952ba94f0a91783b5a2e4b502a88d4f3", "passport_auth_status_ss": "8e05741afb3e010e77e163fda72fb76f%2C952ba94f0a91783b5a2e4b502a88d4f3", "uid_tt": "44e3e86934f55be62758ee40da8a2f5b", "uid_tt_ss": "44e3e86934f55be62758ee40da8a2f5b", "sid_tt": "c268f8173fa88fbeedb20d448b86b065", "sessionid": "c268f8173fa88fbeedb20d448b86b065", "sessionid_ss": "c268f8173fa88fbeedb20d448b86b065", "IsDouyinActive": "true", "stream_recommend_feed_params": "%22%7B%5C%22cookie_enabled%5C%22%3Atrue%2C%5C%22screen_width%5C%22%3A1280%2C%5C%22screen_height%5C%22%3A720%2C%5C%22browser_online%5C%22%3Atrue%2C%5C%22cpu_core_num%5C%22%3A8%2C%5C%22device_memory%5C%22%3A8%2C%5C%22downlink%5C%22%3A10%2C%5C%22effective_type%5C%22%3A%5C%224g%5C%22%2C%5C%22round_trip_time%5C%22%3A100%7D%22", "publish_badge_show_info": "%220%2C0%2C0%2C1716364389260%22", "FOLLOW_NUMBER_YELLOW_POINT_INFO": "%22MS4wLjABAAAAymbfzWAxITJU71cGScIM2POKocFH5U5Yalo1TIuCGEs%2F1716393600000%2F0%2F1716364389515%2F0%22", "volume_info": "%7B%22isUserMute%22%3Afalse%2C%22isMute%22%3Afalse%2C%22volume%22%3A0.5%7D", "stream_player_status_params": "%22%7B%5C%22is_auto_play%5C%22%3A0%2C%5C%22is_full_screen%5C%22%3A0%2C%5C%22is_full_webscreen%5C%22%3A0%2C%5C%22is_mute%5C%22%3A0%2C%5C%22is_speed%5C%22%3A1%2C%5C%22is_visible%5C%22%3A1%7D%22", "passport_fe_beating_status": "true", "_bd_ticket_crypt_doamin": "2", "_bd_ticket_crypt_cookie": "a7adf2f3bd8c67fdeeda5991a202f6bf", "__security_server_data_status": "1", "bd_ticket_guard_client_data": "eyJiZC10aWNrZXQtZ3VhcmQtdmVyc2lvbiI6MiwiYmQtdGlja2V0LWd1YXJkLWl0ZXJhdGlvbi12ZXJzaW9uIjoxLCJiZC10aWNrZXQtZ3VhcmQtcmVlLXB1YmxpYy1rZXkiOiJCTEw3NmUzdTJOYjVFUEVHdW1YbzVVNzZhM2RnZFFxZDJDWXFWaDlUd3lTMGhPOEdPTGRWaVdxK2hQTlJnbis4TWQ0KzkwSGVhWW5RRWZsYXhxWk5lQXc9IiwiYmQtdGlja2V0LWd1YXJkLXdlYi12ZXJzaW9uIjoxfQ%3D%3D", "sid_guard": "c268f8173fa88fbeedb20d448b86b065%7C1716364391%7C5183999%7CSun%2C+21-Jul-2024+07%3A53%3A10+GMT", "sid_ucp_v1": "1.0.0-KDgxYzNmNjlmNzYwNmE3ZGQ4Mzc5YmM2MTdkYTQ0ZWU2YTc4MDMwZTEKGwjj1pDk4c2sARDnyLayBhjvMSAMOAJA8QdIBBoCbHEiIGMyNjhmODE3M2ZhODhmYmVlZGIyMGQ0NDhiODZiMDY1", "ssid_ucp_v1": "1.0.0-KDgxYzNmNjlmNzYwNmE3ZGQ4Mzc5YmM2MTdkYTQ0ZWU2YTc4MDMwZTEKGwjj1pDk4c2sARDnyLayBhjvMSAMOAJA8QdIBBoCbHEiIGMyNjhmODE3M2ZhODhmYmVlZGIyMGQ0NDhiODZiMDY1", "msToken": "QCcS_H7cJBoVqiIuJAik-0SGkDxL7QSakvIO25NAHOOaZ4X_Sg_ee1kb8S6NwxVYU29jI3Wbi6OLjYnS-LXavHAdnCFF8i1vKZjawwHcAsDl-ahgSuiBsZNRjC-vow==", "odin_tt": "038382482dda72d4055c0779766cf69e8874c074e8c352392663adb14c308cb8b5de02eadd4ed942a5db0186e1f17808"}}
\ No newline at end of file
# -*- coding: utf-8 -*-
import datetime
import json
import time
import pandas as pd
import pymongo
import requests
from classtool import sendData
from base import BaseCore
baseCore = BaseCore.BaseCore()
headers = {
'Accept': 'application/json, text/plain, */*',
'Accept-Encoding': 'gzip, deflate, br, zstd',
'Accept-Language': 'zh-CN,zh-TW;q=0.9,zh;q=0.8',
'Connection': 'keep-alive',
# 'Content-Length': '165',
'Content-Type': 'application/json',
# 'Host': 'capi.tianyancha.com',
# 'Origin': 'https://www.tianyancha.com',
# 'Referer': 'https://www.tianyancha.com/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/126.0.0.0',
'X-AUTH-TOKEN': 'eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxODYzODIxNzUxNyIsImlhdCI6MTcyMDUxNDc2NCwiZXhwIjoxNzIzMTA2NzY0fQ.QYx-WxdPYFT-jL_nPpIjvvSIPMVyXbgQCs3b7CY0UJwKGyMrH79BSR6nfz7oy-sF3qbQyC8RSGApKQAbhi4GYg',
'X-TYCID': '34b3bab0127d11ee8588919796d351cc',
'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="126", "Microsoft Edge";v="126"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'version': 'TYC-Web ',
}
if __name__ == '__main__':
# df = pd.read_excel(r'C:\Users\Bailihou\Desktop\sys_base_enterprise.xlsx', sheet_name='sys_base_enterprise')
# socialCodes = df['social_credit_code'].to_list()
# for socialCode in socialCodes:
# sql = f'select * from sys_base_enterprise_shareholder where social_credit_code="{socialCode}"'
# baseCore.cursor_.execute(sql)
# data_ = baseCore.cursor_.fetchone()
# if not data_:
# sql = f'select TYCID from EnterpriseInfo where SocialCode="{socialCode}"'
# baseCore.cursor.execute(sql)
# data = baseCore.cursor.fetchone()
# if data[0] == 'None' or not data[0]:
# continue
# item = f'{socialCode}|{data[0]}'
# baseCore.r.lpush('jiancaiEnterprise:gdxx', item)
# info = '91320600704042209U|2321992952'
# tycid = info.split('|')[1]
# socialCreditCode = info.split('|')[0]
# url = 'https://capi.tianyancha.com/cloud-company-background/companyV2/dim/holder/latest/announcement'
# sort = 1
# dics = []
# for page in range(1, 2):
# dataPost = {
# 'gid': f'{tycid}',
# 'historyType': None,
# 'keyword': "",
# 'pageNum': page,
# 'pageSize': 50,
# 'percentLevel': "-100",
# 'shareholderType': "-100",
# 'subscribedCapital': "-100",
# '_unUseParam': 0,
# }
# dataPost = json.dumps(dataPost)
# req = requests.post(url, headers=headers, data=dataPost)
# datasJson = req.json()['data']['result']
# for holder_info in datasJson:
# name = holder_info['shareHolderName'] # 股东名称
# shareHoldRation = holder_info['percent'] # 持股比例
# shareHoldNum = holder_info['shareholdingNum'] # 持股数
# shareHoldUnit = holder_info['shareholdingNumUnit'] # 持股单位
# shareType = holder_info['shareType'] # 持股类型
# year = holder_info['yearReport'] # 发布年份
# dic = {
# 'socialCreditCode': socialCreditCode,
# 'name': name,
# 'shareHoldRation': shareHoldRation,
# 'shareHoldNum': shareHoldNum,
# 'shareHoldUnit': shareHoldUnit,
# 'shareType': shareType,
# 'year': year,
# 'sort': sort
# }
# sort += 1
# dics.append(dic)
# time.sleep(5)
# # print(json.dumps(dics, ensure_ascii=False))
# sendData('http://114.115.236.206:8088/sync/shareHolder', dics)
sql = 'select words_name from '
baseCore.close()
import pandas as pd import pandas as pd
...@@ -2,7 +2,7 @@ import pandas as pd ...@@ -2,7 +2,7 @@ import pandas as pd
import pymongo import pymongo
# 7649 # 7649
data_list = [] data_list = []
db_stroage = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='ZZsn@9988').ZZSN['7-17全球企业资讯删除数据'] db_stroage = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='ZZsn@9988').ZZSN['新华丝路-丝路商机100+']
# datas = db_stroage.find({"内容": {"$ne": None, "$exists": True}}) # datas = db_stroage.find({"内容": {"$ne": None, "$exists": True}})
# 导出标签是空的数据 # 导出标签是空的数据
datas = db_stroage.find() datas = db_stroage.find()
...@@ -11,11 +11,11 @@ for data in datas: ...@@ -11,11 +11,11 @@ for data in datas:
del data['_id'] del data['_id']
del data['id'] del data['id']
if data['标题'] not in link: # if data['标题'] not in link:
data_list.append(data) # data_list.append(data)
link.append(data['标题']) # link.append(data['标题'])
data_list.append(data)
# print(data) # print(data)
print(len(data_list)) print(len(data_list))
df = pd.DataFrame(data_list) df = pd.DataFrame(data_list)
df.to_excel('./7-17全球企业资讯不保留数据.xlsx',index=False) df.to_excel('./新华丝路-丝路投资2.xlsx',index=False)
\ No newline at end of file \ No newline at end of file
import json
import json
import re
import threading
import time
import uuid
import pandas as pd
import pymongo
import redis
import requests
from bs4 import BeautifulSoup
from retry import retry
from elasticsearch import Elasticsearch
from base import BaseCore
from obs import ObsClient
import fitz
from urllib.parse import unquote
baseCore = BaseCore.BaseCore(sqlFlg=False)
log = baseCore.getLogger()
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[
'新华丝路-丝路商机100+']
lock = threading.Lock()
class EsMethod(object):
def __init__(self):
# 创建Elasticsearch对象,并提供账号信息
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'), timeout=300)
self.index_name = 'subjectdatabase'
def queryatt(self,index_name,pnum,title):
body = {
"query": {
"bool": {
"must": [
{
"term": {
"sid": "1597878873601540098"
}
},
{
"term": {
"title.keyword": title
}
}
]
}
}
}
result = self.es.search(index=index_name
, doc_type='_doc'
, body=body)
# log.info(result)
return result
def clean_html_tag(content):
# todo: 考虑正式场景中是以</p>进行段落划分的
ori_text = re.sub("(<\/p\s*>)", "\t", content)
# 处理图片标签
ori_text = re.sub(r"<img.*?/>", "", ori_text)
tag_content_list = ori_text.split("\t") if "<p" in ori_text else ori_text
temp_content_list = []
if type(tag_content_list) is list:
for text in tag_content_list:
bs = BeautifulSoup(text, 'lxml')
ori_match_content = bs.text.strip()
temp_content_list.append(ori_match_content)
match_content = "\n".join(temp_content_list)
else:
bs1 = BeautifulSoup(tag_content_list, 'lxml')
match_content = bs1.text.strip()
# if "参考文献" not in tag_content_list:
# match_content = temp_content
# else:
# match_content = temp_content.split("参考文献")[0]
return match_content
def preprocess(text: str):
text = text.strip().strip('\n').strip()
text = re.sub(' +', '', text)
text = re.sub('\n+', '\n', text)
return text
def main(page, p, esMethod, title):
result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p,title=title)
total = result['hits']['total']['value']
# if total == 0:
# log.info('++++已没有数据+++++')
# return
try:
msglist = result['hits']['hits']
except:
log.info(f'error-----{result}')
return
log.info(f'---第{page}页{len(msglist)}条数据----共{total}条数据----')
for mms in msglist:
id = mms['_id']
title = mms['_source']['title']
try:
content = mms['_source']['content']
except:
continue
try:
contentWithTag = mms['_source']['contentWithTag']
except:
continue
try:
clean_content = clean_html_tag(content)
pre_content = preprocess(clean_content)
except:
pre_content = content
try:
summary = mms['_source']['summary']
except:
summary = ''
try:
clean_summary = clean_html_tag(summary)
pre_summary = preprocess(clean_summary)
except:
pre_summary = summary
origin = mms['_source']['origin']
publishDate = mms['_source']['publishDate']
log.info(f'{id}--{title}---')
# 存入数据库
dic = {
"id": id,
"标题": title,
"摘要": pre_summary,
"内容": pre_content,
"带标签内容": contentWithTag,
"来源": origin,
"发布时间": publishDate,
}
db_storage.insert_one(dic)
def run_threads(esMethod):
# 读取excel文件
page = 1
p = 200
df = pd.read_excel('24.7.30丝路投资.xlsx', sheet_name='Sheet3')
for i in range(len(df)):
title = df.iloc[i][0]
main(page, p, esMethod, title)
# threads = []
#
# for i in range(num_threads):
# page = j + i + 1
# p = j + i * 200
# thread = threading.Thread(target=main, args=(page, p, esMethod))
#
# threads.append(thread)
# thread.start()
#
# for thread in threads:
# thread.join()
if __name__ == "__main__":
j = 0
# for i in range(1):
# esMethod = EsMethod()
# # result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p)
# # total = result['hits']['total']['value']
# # if total == 0:
# # log.info('++++已没有数据+++++')
# # break
# start = time.time()
# num_threads = 5
# run_threads(num_threads, esMethod, j)
# j += 1000
#
# log.info(f'5线程 每个处理200条数据 总耗时{time.time() - start}秒')
esMethod = EsMethod()
run_threads(esMethod)
\ No newline at end of file
import json import json
...@@ -18,7 +18,7 @@ baseCore = BaseCore.BaseCore(sqlFlg=False) ...@@ -18,7 +18,7 @@ baseCore = BaseCore.BaseCore(sqlFlg=False)
log = baseCore.getLogger() log = baseCore.getLogger()
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[ db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[
'新华丝路-丝路投资'] '新华丝路-丝路商机']
lock = threading.Lock() lock = threading.Lock()
...@@ -37,7 +37,7 @@ class EsMethod(object): ...@@ -37,7 +37,7 @@ class EsMethod(object):
"must": [ "must": [
{ {
"match": { "match": {
"sid": "1597878873601540098" "sid": "1597878386361827330"
} }
} }
] ]
...@@ -201,7 +201,7 @@ def run_threads(num_threads,esMethod,j): ...@@ -201,7 +201,7 @@ def run_threads(num_threads,esMethod,j):
if __name__ == "__main__": if __name__ == "__main__":
j = 0 j = 0
for i in range(2): for i in range(1):
esMethod = EsMethod() esMethod = EsMethod()
# result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p) # result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p)
# total = result['hits']['total']['value'] # total = result['hits']['total']['value']
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import datetime
import os import os
import random import random
import sys import sys
...@@ -7,13 +8,23 @@ import logbook ...@@ -7,13 +8,23 @@ import logbook
import logbook.more import logbook.more
# 核心工具包 # 核心工具包
import pymysql import pymysql
import redis
from tqdm import tqdm from tqdm import tqdm
# 注意 程序退出前 调用BaseCore.close() 关闭相关资源 # 注意 程序退出前 调用BaseCore.close() 关闭相关资源
class BaseCore: class BaseCore:
def __int__(self):
self.r = redis.Redis(host='114.116.90.53', port=6380, password='clbzzsn', db=6)
self.cnx_ = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project',
charset='utf8mb4')
self.cursor_ = self.cnx_.cursor()
# 序列号 # 序列号
__seq = 0 __seq = 0
# 代理池 数据库连接 # 代理池 数据库连接
__cnx_proxy =None __cnx_proxy = None
__cursor_proxy = None __cursor_proxy = None
# agent 池 # agent 池
__USER_AGENT_LIST = [ __USER_AGENT_LIST = [
...@@ -212,16 +223,17 @@ class BaseCore: ...@@ -212,16 +223,17 @@ class BaseCore:
try: try:
self.__cursor_proxy.close() self.__cursor_proxy.close()
self.__cnx_proxy.close() self.__cnx_proxy.close()
except : except:
pass pass
def __init__(self): def __init__(self):
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project', self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project',
charset='utf8mb4') charset='utf8mb4')
self.__cursor_proxy= self.__cnx_proxy.cursor() self.__cursor_proxy = self.__cnx_proxy.cursor()
pass pass
# 计算耗时 # 计算耗时
def getTimeCost(self,start, end): def getTimeCost(self, start, end):
seconds = int(end - start) seconds = int(end - start)
m, s = divmod(seconds, 60) m, s = divmod(seconds, 60)
h, m = divmod(m, 60) h, m = divmod(m, 60)
...@@ -234,6 +246,7 @@ class BaseCore: ...@@ -234,6 +246,7 @@ class BaseCore:
else: else:
ms = int((end - start) * 1000) ms = int((end - start) * 1000)
return "%d毫秒" % (ms) return "%d毫秒" % (ms)
# 当前时间格式化 # 当前时间格式化
# 1 : 2001-01-01 12:00:00 %Y-%m-%d %H:%M:%S # 1 : 2001-01-01 12:00:00 %Y-%m-%d %H:%M:%S
# 2 : 010101120000 %y%m%d%H%M%S # 2 : 010101120000 %y%m%d%H%M%S
...@@ -248,9 +261,8 @@ class BaseCore: ...@@ -248,9 +261,8 @@ class BaseCore:
now_time = int(time.time() * 1000) now_time = int(time.time() * 1000)
return now_time return now_time
# 日志格式 # 日志格式
def logFormate(self,record, handler): def logFormate(self, record, handler):
formate = "[{date}] [{level}] [{filename}] [{func_name}] [{lineno}] {msg}".format( formate = "[{date}] [{level}] [{filename}] [{func_name}] [{lineno}] {msg}".format(
date=record.time, # 日志时间 date=record.time, # 日志时间
level=record.level_name, # 日志等级 level=record.level_name, # 日志等级
...@@ -260,8 +272,9 @@ class BaseCore: ...@@ -260,8 +272,9 @@ class BaseCore:
msg=record.message # 日志内容 msg=record.message # 日志内容
) )
return formate return formate
# 获取logger # 获取logger
def getLogger(self,fileLogFlag=True, stdOutFlag=True): def getLogger(self, fileLogFlag=True, stdOutFlag=True):
dirname, filename = os.path.split(os.path.abspath(sys.argv[0])) dirname, filename = os.path.split(os.path.abspath(sys.argv[0]))
dirname = os.path.join(dirname, "logs") dirname = os.path.join(dirname, "logs")
filename = filename.replace(".py", "") + ".log" filename = filename.replace(".py", "") + ".log"
...@@ -330,7 +343,8 @@ class BaseCore: ...@@ -330,7 +343,8 @@ class BaseCore:
} }
ip_list.append(proxy) ip_list.append(proxy)
return ip_list return ip_list
def get_proxyIPPort(self): def get_proxyIPPort(self):
ip_list = [] ip_list = []
with self.__cursor_proxy as cursor: with self.__cursor_proxy as cursor:
...@@ -348,4 +362,32 @@ class BaseCore: ...@@ -348,4 +362,32 @@ class BaseCore:
ip_list.append(proxy) ip_list.append(proxy)
return ip_list return ip_list
\ No newline at end of file
def getSidName(self, sid):
sqlSelect = f"SELECT words_name FROM `key_words` WHERE id = '{sid}'"
self.cursor_.execute(sqlSelect)
data = self.cursor_.fetchone()[0]
return data
# 获得脚本进程PID
def getPID(self):
PID = os.getpid()
return PID
def getUniqueCode(self, abbr, serverId, threadId):
while True:
timeCode = self.r.blpop(['timeCode:sougou'], 2)
if timeCode:
timeCode = timeCode[1]
timeCode = timeCode.decode('utf-8')
break
else:
time.sleep(2)
pid = str(self.getPID())
if len(pid) < 4:
pid = pid.zfill(4)
elif len(pid) > 4:
pid = pid[0:4]
uniqueCode = abbr + str(datetime.datetime.now().strftime('%Y%m%d'))[2:] + serverId + pid + str(threadId) + str(timeCode)
return uniqueCode
...@@ -609,6 +609,29 @@ if __name__ == '__main__': ...@@ -609,6 +609,29 @@ if __name__ == '__main__':
# 'selBoardCode': ''} # 'selBoardCode': ''}
# req = requests.get(url='http://eid.csrc.gov.cn/101812/index_2_f.html', headers=headers, data=payload) # req = requests.get(url='http://eid.csrc.gov.cn/101812/index_2_f.html', headers=headers, data=payload)
# print(req.status_code) # print(req.status_code)
publish_time = '2023年10月5日 ' # publish_time = '2023年10月5日 '
aaa = paserTime(publish_time) # aaa = paserTime(publish_time)
print(aaa) # print(aaa)
test = """基于"industry_test"这个主题,以下是一个可能的产业链环节划分:
```json
{
"上游": ["原材料开采", "原料加工", "零部件生产"],
"中游": ["组装与集成", "技术研发", "质量检测"],
"下游": ["产品销售", "售后服务", "市场推广"]
}
```
在这个例子中:
- 上游环节涉及原材料的获取和处理,以及生产产业链所需的基础零部件。
- 中游环节关注产品的组装、技术研发和质量控制,是连接上游和下游的关键环节。
- 下游环节则直接面向消费者,包括销售、服务以及产品的市场推广活动。"""
import re
json_pattern = r'{.*?}'
matches = re.findall(json_pattern, test, re.DOTALL)
print(matches[0])
import datetime
import json
import os.path
import random
import pymongo
from bson import ObjectId
from openpyxl import Workbook, load_workbook
from base.BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[
'天眼查登录信息']
class File():
# 创建文件
def createFile(self, file_name):
if os.path.exists(file_name):
return
else:
wb = Workbook()
sheet = wb.active
# 更改默认的sheet名称
sheet.title = "需处理企业"
sheet.append(["企业名称", "社会信用代码"])
# 创建另一个sheet
sheet2 = wb.create_sheet("获取基本信息成功企业")
sheet2.append(["企业名称", "采到的企业名称", "社会信用代码", "采到的信用代码"])
wb.save(file_name)
wb.close()
# 删除文件
def deleteFile(self, file_name):
if os.path.exists(file_name):
os.remove(file_name)
else:
pass
# 追加数据
def appenddata(self, file_name, sheet, data):
# 打开现有的Excel文件
wb = load_workbook(file_name)
# 选择要追加数据的sheet
sheet = wb[sheet]
sheet.append(data)
# 保存Excel文件
wb.save(file_name)
wb.close()
class Token():
# 获取token
def get_cookies(self):
flg = False
query = { "fenghaoTime": { "$lt": str(datetime.datetime.now() - datetime.timedelta(hours=2))}}
result = db_storage.find_one(query, sort=[('updateTime', 1)])
# results = db_storage.find({}, sort=[('updateTime', 1)])
if result:
flg = True
# for result in results:
# if result['fenghaoTime'] < result['updateTime']:
# flg = True
# break
if flg:
cookies = result['cookies']
id_token = result['_id']
user_name = result['name']
return cookies, id_token, user_name
else:
return '', '', ''
# 删除失效的token
def delete_token(self, cookie_):
deletesql = f"delete from QCC_token where id='{cookie_}' "
cursor.execute(deletesql)
cnx.commit()
# token的处理
def updateTokeen(self, id_token, type):
if type == 1:
# session失效,删除token
cursor.execute(f"delete from QCC_token where id={id_token}")
if type == 2:
# 封号了 修改封号时间
filter = {'_id': ObjectId(id_token)}
# 更新操作
update = {'$set': {'fenghaoTime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}}
# 执行更新操作
db_storage.update_one(filter, update)
if type == 3:
# 修改使用时间
filter = {'_id': ObjectId(id_token)}
# 更新操作
update = {'$set': {'updateTime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}}
# 执行更新操作
db_storage.update_one(filter, update)
cnx.commit()
class Tag():
# 删除特定属性标签
def deletep(self, soup, tag_, attribute_to_delete, value_to_delete):
if attribute_to_delete and value_to_delete:
# 查找带有指定属性的P标签并删除
tags = soup.find_all(tag_, {attribute_to_delete: value_to_delete})
for tag in tags:
# print(tag)
tag.decompose()
else:
tags = soup.find_all(tag_)
for tag in tags:
# print(tag)
tag.decompose()
# 删除空标签
def deletek(self, soup):
# 删除空白标签(例如<p></p>、<p><br></p>, img、video、hr除外)
for i in soup.find_all(lambda tag: len(tag.get_text()) == 0 and tag.name not in ["img", "video",
"br"] and tag.name != "br" or tag.get_text() == ' ' or tag.get_text() == ' '):
for j in i.descendants:
if j.name in ["img", "video", "br"]:
break
else:
i.decompose()
# 删除span标签
def deletespan(self, td):
spans = td.find_all('span', class_='app-copy copy-button-item')
for span in spans:
if '复制' in span.text:
span.extract() # 删除span标签
spans2 = td.find_all('span', slot='content')
for span2 in spans2:
if '趋势图' in span2.text:
span2.extract()
spans3 = td.find_all('span', class_='m-l-r-10')
for span3 in spans3:
if '年报' in span3.text:
span3.extract()
spans4 = td.find_all('span', class_='text-span')
for span4 in spans4:
span4.extract()
if __name__ == '__main__':
token = Token()
print(token.get_cookies())
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论