提交 d43a13da 作者: 薛凌堃

Merge remote-tracking branch 'origin/master'

...@@ -14,6 +14,12 @@ def conn11(): ...@@ -14,6 +14,12 @@ def conn11():
cursor = conn.cursor() cursor = conn.cursor()
return conn,cursor return conn,cursor
def conn144():
conn = pymysql.Connect(host='114.115.159.144', port=3306, user='caiji', passwd='zzsn9988', db='caiji',
charset='utf8')
cursor = conn.cursor()
return conn,cursor
#企业公告 #企业公告
def shizhiCodeFromSql(): def shizhiCodeFromSql():
conn,cursor=conn11() conn,cursor=conn11()
...@@ -31,6 +37,7 @@ def shizhiCodeFromSql(): ...@@ -31,6 +37,7 @@ def shizhiCodeFromSql():
finally: finally:
cursor.close() cursor.close()
conn.close() conn.close()
#企业公告 #企业公告
def yahooCodeFromSql(): def yahooCodeFromSql():
conn,cursor=conn11() conn,cursor=conn11()
...@@ -49,6 +56,25 @@ def yahooCodeFromSql(): ...@@ -49,6 +56,25 @@ def yahooCodeFromSql():
cursor.close() cursor.close()
conn.close() conn.close()
#新浪纽交所股票对应的代码
def sinausstockCodeFromSql():
conn,cursor=conn144()
try:
gn_query = "select ticker from mgzqyjwyh_list where state=2 and exchange='NYSE'; "
cursor.execute(gn_query)
gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result]
print('sinausstockCodeFromSql开始将股票代码放入redis=======')
for item in gn_social_list:
r.rpush('sina_usstock:securities_code', item)
print('sinausstockCodeFromSql将股票代码放入redis结束')
except Exception as e:
log.info("数据查询异常")
finally:
cursor.close()
conn.close()
def yahooCode_task(): def yahooCode_task():
# 实例化一个调度器 # 实例化一个调度器
scheduler = BlockingScheduler() scheduler = BlockingScheduler()
...@@ -58,9 +84,12 @@ def yahooCode_task(): ...@@ -58,9 +84,12 @@ def yahooCode_task():
scheduler.add_job(yahooCodeFromSql, 'cron', day='*/3', hour=0, minute=0) scheduler.add_job(yahooCodeFromSql, 'cron', day='*/3', hour=0, minute=0)
# 每天执行一次 # 每天执行一次
scheduler.add_job(shizhiCodeFromSql, 'cron', hour=10,minute=0) scheduler.add_job(shizhiCodeFromSql, 'cron', hour=10,minute=0)
# 每天执行一次
scheduler.add_job(sinausstockCodeFromSql, 'cron', day='*/3', hour=0, minute=0)
try: try:
yahooCodeFromSql() # 定时开始前执行一次 # yahooCodeFromSql() # 定时开始前执行一次
shizhiCodeFromSql() # 定时开始前执行一次 # shizhiCodeFromSql() # 定时开始前执行一次
sinausstockCodeFromSql() # 定时开始前执行一次
scheduler.start() scheduler.start()
except Exception as e: except Exception as e:
print('定时采集异常', e) print('定时采集异常', e)
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
...@@ -373,6 +373,28 @@ class YahooCaiwu(object): ...@@ -373,6 +373,28 @@ class YahooCaiwu(object):
currency='' currency=''
return currency return currency
#对比指标计算
def calculateIndexReq(self):
get_url = 'http://114.115.236.206:8088/sync/calculateIndex'
try:
params={
'type':2
}
resp = requests.get(get_url,params=params)
print(resp.text)
text=json.loads(resp.text)
codee=text['code']
while codee==-200:
time.sleep(600)
resp = requests.get(get_url)
print(resp.text)
text=json.loads(resp.text)
codee=text['code']
if codee==-200:
break
print('调用接口成功!!')
except:
print('调用失败!')
if __name__ == '__main__': if __name__ == '__main__':
# parse_excel() # parse_excel()
#get_content1() #get_content1()
...@@ -383,8 +405,11 @@ if __name__ == '__main__': ...@@ -383,8 +405,11 @@ if __name__ == '__main__':
securitiescode=yahoo.getCodeFromRedis() securitiescode=yahoo.getCodeFromRedis()
yahoo.get_content2(securitiescode) yahoo.get_content2(securitiescode)
except Exception as e: except Exception as e:
print('没有数据暂停5分钟')
yahoo.calculateIndexReq()
if securitiescode: if securitiescode:
yahoo.r.rpush('NoticeEnterprise:securities_code',securitiescode) yahoo.r.rpush('NoticeEnterprise:securities_code',securitiescode)
else: else:
time.sleep(300) time.sleep(300)
print('没有数据暂停5分钟')
import configparser import configparser
...@@ -20,6 +20,7 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) ...@@ -20,6 +20,7 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from operator import itemgetter from operator import itemgetter
from itertools import groupby from itertools import groupby
import datetime import datetime
from decimal import Decimal
class SinaUsstock(object): class SinaUsstock(object):
...@@ -54,13 +55,19 @@ class SinaUsstock(object): ...@@ -54,13 +55,19 @@ class SinaUsstock(object):
seriesValue=tddoc.find('td').text().split(' ') seriesValue=tddoc.find('td').text().split(' ')
for i in range(0,len(pdate)): for i in range(0,len(pdate)):
value=seriesValue[i] value=seriesValue[i]
try:
if '亿' in value: if '亿' in value:
value = value.replace("亿", "*100000000") value = value.replace("亿", "").replace(",", "")
value = eval(value) value = Decimal(value) * Decimal('100000000')
# value = eval(value)
elif '万' in value: elif '万' in value:
value = value.replace("万", "*10000") value = value.replace("万", "").replace(",", "")
value = eval(value) value = Decimal(value) * Decimal('10000')
vvla=str(value) # value = eval(value)
except Exception as e:
print(e)
print(value)
vvla=str(value).replace(",", "")
serisemsg={ serisemsg={
'name':seriesName, 'name':seriesName,
'value':vvla, 'value':vvla,
...@@ -71,6 +78,31 @@ class SinaUsstock(object): ...@@ -71,6 +78,31 @@ class SinaUsstock(object):
return seriesList return seriesList
# 判断股票代码是否存在
def check_code(self,com_code):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn',db=3)
res = r.exists('com_sinacaiwushuju_code::'+com_code)
#如果key存在 则不是第一次采集该企业, res = 1
if res:
return False #表示不是第一次采集
else:
return True #表示是第一次采集
def check_date(self,com_code,info_date):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=3)
res = r.sismember('com_sinacaiwushuju_code::'+com_code, info_date) # 注意是 保存set的方式
if res:
return True
else:
return False
# 将采集后的股票代码对应的报告期保存进redis
def add_date(self,com_code,date_list):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn',db=3)
#遍历date_list 放入redis
for date in date_list:
res = r.sadd('com_sinacaiwushuju_code::'+com_code,date)
def getCodeFromRedis(self): def getCodeFromRedis(self):
securitiescode=self.r.lpop('sina_usstock:securities_code') securitiescode=self.r.lpop('sina_usstock:securities_code')
securitiescode = securitiescode.decode('utf-8') securitiescode = securitiescode.decode('utf-8')
...@@ -209,7 +241,7 @@ class SinaUsstock(object): ...@@ -209,7 +241,7 @@ class SinaUsstock(object):
#转换数据格式发送接口 #转换数据格式发送接口
annualzb=zbl1+zbl3+zbl5 annualzb=zbl1+zbl3+zbl5
annualzb=self.groupZbData(annualzb,stock,social_credit_code,'annual') annualzb=self.groupZbData(annualzb,stock,social_credit_code,'year')
self.sendToFinance(annualzb) self.sendToFinance(annualzb)
quarterzb=zbl2+zbl4+zbl6 quarterzb=zbl2+zbl4+zbl6
quarterzb=self.groupZbData(quarterzb,stock,social_credit_code,'quarter') quarterzb=self.groupZbData(quarterzb,stock,social_credit_code,'quarter')
...@@ -228,15 +260,26 @@ class SinaUsstock(object): ...@@ -228,15 +260,26 @@ class SinaUsstock(object):
def sendToFinance(self,zbmsg): def sendToFinance(self,zbmsg):
for zbb in zbmsg: for zbb in zbmsg:
com_code=zbb['securitiesCode']
com_date=zbb['date']
#判断股票代码是否采集过
if self.check_code(com_code):
zbb['ynFirst']=True
if len(zbb) != 0: if len(zbb) != 0:
# 调凯歌接口存储数据 # 调凯歌接口存储数据
data = json.dumps(zbb) data = json.dumps(zbb)
#暂无接口 #暂无接口
url_baocun = '' url_baocun = 'http://114.115.236.206:8088/sync/finance/sina'
# url_baocun = 'http://114.115.236.206:8088/sync/finance/df' # url_baocun = 'http://114.115.236.206:8088/sync/finance/df'
for nnn in range(0, 3): for nnn in range(0, 3):
try: try:
res_baocun = requests.post(url_baocun, data=data) res_baocun = requests.post(url_baocun, data=data)
#将采集到的股票代码和日期进行记录用来标记是否采集过
com_date_list=[]
com_date_list.append(com_date)
self.add_date(com_code,com_date)
self.logger.info(res_baocun.text) self.logger.info(res_baocun.text)
break break
except: except:
...@@ -309,7 +352,7 @@ class SinaUsstock(object): ...@@ -309,7 +352,7 @@ class SinaUsstock(object):
if __name__ == '__main__': if __name__ == '__main__':
sinaUsstock=SinaUsstock() sinaUsstock=SinaUsstock()
# securitiescode= sinaUsstock.r.lpop('sina_usstock:securities_code') # securitiescode= sinaUsstock.r.lpop('sina_usstock:securities_code')
securitiescode= sinaUsstock.getCodeFromRedis() # securitiescode= sinaUsstock.getCodeFromRedis()
securitiescode='AAPL' securitiescode='AAPL'
try: try:
sinaUsstock.get_content2(securitiescode) sinaUsstock.get_content2(securitiescode)
......
import datetime
import json
import time
import requests
from kafka import KafkaProducer
from retry import retry
from bs4 import BeautifulSoup
from requests.packages import urllib3
from base import BaseCore
urllib3.disable_warnings()
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
}
baseCore = BaseCore.BaseCore()
cnx = baseCore.cnx
cursor = baseCore.cursor
log = baseCore.getLogger()
r = baseCore.r
taskType = '纳斯达克/企业动态'
# 获取企业基本信息
def getInfomation(social_code):
sql = f"select * from mgzqyjwyh_list where state=2 and xydm='{social_code}';"
cursor.execute(sql)
data = cursor.fetchone()
return data
# 时间转换
def conversionTime(time):
try:
date_obj = datetime.datetime.strptime(time, "%B %d, %Y")
except:
date_obj = datetime.datetime.strptime(time, "%b%d,%Y")
pub_time = date_obj.strftime("%Y-%m-%d")
return pub_time
# 获取总页数
@retry(tries=3, delay=1)
def getTotal(gpdm):
url = f'https://api.nasdaq.com/api/news/topic/articlebysymbol?q={gpdm}|stocks&offset=0&limit=100&fallback=false'
req = requests.get(url, headers=headers, verify=False)
req.encoding = req.apparent_encoding
total = req.json()['data']['totalrecords']
req.close()
return total
# 获取信息列表
@retry(tries=3, delay=1)
def getDataList(gpdm, offest, social_code):
data_list = []
url = f'https://api.nasdaq.com/api/news/topic/articlebysymbol?q={gpdm}|stocks&offset={offest}&limit=100&fallback=false'
# print(url)
req = requests.get(url, headers=headers, verify=False)
req.encoding = req.apparent_encoding
datas = req.json()['data']['rows']
if datas != []:
for data in datas:
title = data['title']
author = data['publisher']
url = data['url']
if 'http' not in url:
url = 'https://www.nasdaq.com' + url
data_list.append([url, title, author, social_code])
req.close()
return data_list
@retry(tries=3, delay=1)
def getsoup(url):
req = requests.get(url, headers=headers, verify=False)
# req = session.get(url)
req.encoding = req.apparent_encoding
soup = BeautifulSoup(req.text, 'lxml')
return soup
# 页面A类型解析
def getDicA(data, soup):
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
url = data[0]
pub_time = soup.find('p', class_='jupiter22-c-author-byline__timestamp').text.split('—')[0].lstrip().strip()
pub_time = conversionTime(pub_time)
contentWithTag = soup.find('div', class_='nsdq-l-grid__item syndicated-article-body')
try:
contentWithTag.find('div', class_='jupiter22-c-tags jupiter22-c-tags-default').decompose()
except:
pass
try:
contentWithTag.find('div', class_='taboola-placeholder').decompose()
except:
pass
try:
divs_del = contentWithTag.find_all('div', class_='ads__inline')
for div_del in divs_del:
div_del.decompose()
except:
pass
try:
divs_del = contentWithTag.find_all('script')
for div_del in divs_del:
div_del.decompose()
except:
pass
content = contentWithTag.text
dic_news = {
'attachmentIds': '',
'author': data[2],
'content': content,
'contentWithTag': str(contentWithTag),
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'en',
'origin': '纳斯达克',
'publishDate': pub_time,
'sid': '1684032033495392257',
'sourceAddress': url, # 原文链接
'summary': '',
'title': data[1],
'type': 2,
'socialCreditCode': data[3],
'year': pub_time[:4]
}
return dic_news
# 页面B类型解析
def getDicB(data, soup):
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
url = data[0]
pub_time = soup.find('div', class_='timestamp').find('time').text
pub_time = pub_time.split(' ')[0] + pub_time.split(' ')[1] + pub_time.split(' ')[2]
pub_time = conversionTime(pub_time)
contentWithTag = soup.find('div', class_='body__content')
try:
divs_del = contentWithTag.find_all('div', class_='ads__inline')
for div_del in divs_del:
div_del.decompose()
except:
pass
try:
divs_del = contentWithTag.find_all('script')
for div_del in divs_del:
div_del.decompose()
except:
pass
content = contentWithTag.text
imgs = contentWithTag.find_all('img')
for img in imgs:
src = img.get('src')
src_ = 'https://www.nasdaq.com' + src
contentWithTag = str(contentWithTag).replace(src, src_)
dic_news = {
'attachmentIds': '',
'author': data[2],
'content': content,
'contentWithTag': str(contentWithTag),
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'en',
'origin': '纳斯达克',
'publishDate': pub_time,
'sid': '1684032033495392257',
'sourceAddress': url, # 原文链接
'summary': '',
'title': data[1],
'type': 2,
'socialCreditCode': data[3],
'year': pub_time[:4]
}
return dic_news
# 数据发送至Kafka
@retry(tries=3, delay=1)
def sendKafka(dic_news, start_time):
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# 传输成功,写入日志中
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(dic_news['socialCreditCode'], taskType, state, takeTime, dic_news['sourceAddress'], '')
# 数据保存入库,用于判重
@retry(tries=3, delay=1)
def insertMysql(social_code, link):
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
# 动态信息列表
list_info = [
social_code,
link,
'纳斯达克',
'2',
]
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
# 判断动态是否采集过
@retry(tries=3, delay=1)
def selectUrl(url, social_code):
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s and type='2' '''
cursor.execute(sel_sql, (url, social_code))
selects = cursor.fetchone()
return selects
def doJob():
while True:
social_code = ''
data_enterprise = getInfomation(social_code)
gpdm = data_enterprise[3]
social_code = data_enterprise[6]
# gpdm = 'GOOGL'
# social_code = 'ZZSN22080900000013'
start_time = time.time()
try:
total = getTotal(gpdm)
except:
log.error(f'{social_code}==={gpdm}===获取总数失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'{social_code}==={gpdm}===获取总数失败')
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code)
continue
for offest in range(0, total + 1, 100):
try:
data_list = getDataList(gpdm, offest, social_code)
except:
log.error(f'{social_code}==={gpdm}===获取信息列表失败({offest}~{offest + 100}条)')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '',
f'{social_code}==={gpdm}===获取信息列表失败({offest}~{offest + 100}条)')
continue
# 只能获取前10000条数据
if data_list != []:
for data in data_list:
start_time = time.time()
url = data[0]
selects = selectUrl(url, social_code)
if selects:
log.info(f'{url}===已采集过')
# 全量使用
continue
# 增量使用
# break
try:
soup = getsoup(url)
try:
try:
dic_info = getDicA(data, soup)
except:
dic_info = getDicB(data, soup)
except:
log.error(f'{url}===正文解析失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'{url}===正文解析失败')
continue
try:
sendKafka(dic_info, start_time)
try:
insertMysql(social_code, url)
except:
log.error(f'{url}===数据入库失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'{url}===数据入库失败')
except Exception as e:
print(e)
log.error(f'{url}===发送kafka失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'{url}===发送kafka失败')
time.sleep(1)
except:
log.error(f'{url}===页面访问失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'{url}===页面访问失败')
break
else:
break
break
if __name__ == "__main__":
doJob()
baseCore.close()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论