# !/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
@author : v_jiaohaicheng@baidu.com
@des :头条打码工具 V1.0 pyppeteer实现
"""
import hashlib
import os
import asyncio
import time
import requests
from pyppeteer import launch
from queue import Queue
import config
from threading import Thread
import copy
SUCCESS_UPLOAD_COOKIE_NUM = config.SUCCESS_UPLOAD_COOKIE_NUM
NOW_NUM = copy.deepcopy(SUCCESS_UPLOAD_COOKIE_NUM)
wait_verify_cookies_queue = Queue()
async def handle(browser, url, s_v_web_id, username):
"""
清理cookie,对页面请求,检查弹框,关闭页面
:param browser:
:param url:
:param s_v_web_id:
:param username:
:return:
"""
try:
page = await browser.newPage()
# await page.addScriptTag(path='./stealth.min.js')
await page.setViewport(viewport={'width': 1365, 'height': 768})
await page.setJavaScriptEnabled(enabled=True)
# 删掉cookie
await page.deleteCookie()
# 设置不含s_v_web_id的cookie
await page.setCookie({"name": "s_v_web_id", "value": "{}".format(s_v_web_id), "domain": "www.toutiao.com"})
await page.goto(url)
await asyncio.sleep(1)
# 检查弹框
await check_verify(page, url, username)
# 关闭当前页面
await page.close()
except Exception as e:
print(e, e.__traceback__.tb_lineno)
config.BROWSER_ERROR = True
async def check(page, times, status_code,username,url):
"""
30s内间隔1s循环检查验证码状态
:param page:
:param times:
:param status_code:
:param username:
:param url:
:return:
"""
while times > 0:
status_lis = await page.Jeval("div#captcha_container",'node => node.getAttribute("style")')
# print("status_lis",status_lis)
if "none" in status_lis:
status_code = True
cookie = await get_cookie(page)
print("打码后的 cookie:{}".format(cookie))
cookie_status = await verify_code_cookie(url, cookie, page)
if cookie_status == True:
print("本次验证码有效")
global NOW_NUM
NOW_NUM += 1
wait_verify_cookies_queue.put(cookie)
if wait_verify_cookies_queue.qsize() == config.UPLOAD_SIZE:
print(
"有效验证码已达到{}个,正在上传".format(
config.UPLOAD_SIZE))
await upload_cookie(username)
else:
print(
"本次验证码已保存 ({}/5),待上传,当前已成功打码 {}".format(wait_verify_cookies_queue.qsize(),NOW_NUM))
else:
print("本次验证码无效")
return status_code
# elif "display: display" in status_lis[0]:
else:
# print("页面元素获取失败")
times -= 1
time.sleep(1)
print("超时未打码")
return status_code
async def check_verify(page, url, username):
"""
检测请求页面中是否存在验证码
:param page:
:param url:
:param username:
:return:
"""
flag = True
while flag:
try:
verify1 = await page.Jx("//div[@id='captcha_container']")
if len(verify1) > 0:
status_code = False
print("检查到验证码,请{}s内完成打码".format(config.REFRESH_TIME))
# 循环等待检测
await check(page=page, times=config.REFRESH_TIME, status_code=status_code,username=username,url=url)
break
else:
flag = False
except Exception as e:
flag = False
print(e, "\n", e.__traceback__.tb_lineno)
async def verify_code_cookie(url, cookie, page):
"""
验证打码后的cookie有效性
:param url:
:param cookie:
:param page:
:return:
"""
await page.deleteCookie()
# 设置不含s_v_web_id的cookie
await page.setCookie({"name": "s_v_web_id", "value": "{}".format(cookie), "domain": "www.toutiao.com"})
await page.goto(url, {'waitUntil': ['load', 'networkidle0']}, timeout=100000)
await asyncio.sleep(1)
verify1 = await page.Jx("//div[@id='captcha_container']")
verify2 = await page.Jx("//div[@class='profile-tab-feed']")
if len(verify1) > 0:
# print("False1")
return False
else:
if len(verify2) > 0:
return True
else:
# print("False2")
return False
# return True
async def upload_cookie(username, retry_times=config.RETRY_TIMES, timeout=config.TIMEOUT):
"""
上传有效的cookie,上传失败将保存在本地
:param username:
:param retry_times:
:param timeout:
:return:
"""
pass
async def get_cookie(page):
"""
获取cookie中的 s_v_web_id
:param page:
:return:
"""
cookie = await page.cookies()
for i in cookie:
if i.get("name") == "s_v_web_id":
return i["value"]
return None
async def save_cookie(lis):
"""
保存一场cookie
:param lis:
:return:
"""
save_path = R"./result"
os.makedirs(save_path, exist_ok=True)
with open(os.path.join(save_path, "upload_fail_cookie.txt"), "w", encoding="utf-8")as fp:
for text in lis:
fp.write(str(text))
print("本次上传异常的cookie已保存在本地")
async def close():
"""
关闭程序(浏览器出现异常情况好使)
:return:
"""
cmd = "taskkill /f /t /im Chromium.exe"
cmd2 = "taskkill /f /t /im 头条打码工具.exe"
os.system(cmd)
os.system(cmd2)
async def main():
"""
程序入口
:return:
"""
username = input("请输入百度用户名:")
while username == "":
username = input("请输入百度用户名:")
browser = await launch({'headless': False, 'dumpio': True, 'args': ['--disable-infobars', '--ignore-certificate-errors']})
for num, url in enumerate(config.URL_LIST):
if not config.BROWSER_ERROR:
if num > 0 and num % 10 == 0:
print("成功发起请求:{}次".format(num))
else:
pass
try:
# s_v_web_id = input("输入s_v_web_id:")
s_v_web_id = ""
await handle(browser, url, s_v_web_id, username)
except Exception as e:
print(e)
else:
if wait_verify_cookies_queue.qsize() > 0:
print(
"检测到还有{}个cookie未上传".format(
wait_verify_cookies_queue.qsize()))
await upload_cookie(username)
else:
pass
print("请按回车键关闭程序后,重启程序")
input()
await close()
break
def run():
"""
启动
:return:
"""
asyncio.get_event_loop().run_until_complete(main())
if __name__ == '__main__':
Thread(target=run).run()
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/156883.html