仅限技术研究,请勿刷赞刷阅读
核心代码参考了 https://114114.xyz/article/discourse_org_read_and_liked
使用协程,但是并未使用并发请求,仍是同步执行,方便后期添加异步任务。
配置文件conf.json
{
"csrf-token": "",
"cookies": {
"_forum_session": "",
"cf_clearance": "",
"_t": ""
},
"connect-cookies": {
"auth.session-token": "",
"cf_clearance": ""
}
}
代码
#!/usr/bin/env python
import httpx
import asyncio
import json
import copy
from pathlib import Path
from pyquery import PyQuery
HEADERS = {
"Accept": "application/json, text/javascript, */*; q=0.01",
"Host": None,
"Discourse-Logged-In": "true",
"Discourse-Present": "true",
"Discourse-Track-View": "true",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4.1 "
"Safari/605.1.15",
"Referer": None,
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"X-Requested-With": "XMLHttpRequest",
}
CONNECT_HEADERS = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Host": "connect.linux.do",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4.1 "
"Safari/605.1.15"
}
class Topic:
def __init__(self, client):
self.client = client
async def _fetch(self, url, **params):
response = await self.client.get(url, params=params)
return response.json()["topic_list"]["topics"]
async def latest(self, order="created"):
return await self._fetch("/latest.json", order=order)
async def top(self, period="daily"):
return await self._fetch("/top.json", period=period)
async def new(self):
return await self._fetch("/new.json")
async def unread(self):
return await self._fetch("/unread.json")
async def timings(self, data):
return await self.client.post("/topics/timings", data=data)
async def detail(self, topic_id: int):
response = await self.client.get(
f"/t/{topic_id}.json?track_visit=true"
)
return response.raise_for_status().json()
class Discourse:
def __init__(self, domain: str):
self.domain = domain
self.client = self.get_client()
self.topic = Topic(self.client)
@property
def conf(self) -> dict:
if not hasattr(self, "_conf"):
file = Path(__file__).parent / "conf.json"
if not file.exists() or not file.is_file():
raise FileNotFoundError("conf.json not exist")
with file.open("r") as fp:
conf = json.load(fp)
setattr(self, "_conf", conf)
return getattr(self, "_conf")
def get_client(self):
headers = copy.deepcopy(HEADERS)
headers["Referer"] = f"https://{self.domain}/"
headers["Host"] = self.domain
headers["X-CSRF-Token"] = self.conf["csrf-token"]
client = httpx.AsyncClient(
base_url=f"https://{self.domain}",
cookies=self.conf["cookies"],
headers=headers
)
return client
def connect(self):
response = httpx.get(
"https://connect.linux.do",
cookies=self.conf["connect-cookies"],
headers=CONNECT_HEADERS
)
html = response.content.decode()
doc = PyQuery(html)
info = doc("h1").text()
info = info.split(",")[1]
info = info.split(" ")[:-1]
# print(info)
print("Nickname:", info[0])
print("Username:", info[1][1:-1])
print("Level:", info[2])
def log(self, *args, **kwargs):
print(*args, **kwargs)
async def summary(self, username):
response = await self.client.get(f"/u/{username}/summary.json")
user = response.json()["user_summary"]
print("用户名:", username)
print("访问天数:", user["days_visited"])
print("帖子阅读时间:", f"{user['time_read'] // 3600}小时")
print("浏览的话题数:", user["topics_entered"])
print("帖子阅读数量:", user["posts_read_count"])
print("访问天数:", user["days_visited"])
print("送出点赞:", user["likes_given"])
print("收到点赞:", user["likes_received"])
print("创建话题:", user["topic_count"])
print("创建帖子:", user["post_count"])
async def topics_timings(
self,
topic_id: int,
highest_post_num: int,
last_read_post_num: int,
read_time: int = 10000
):
if last_read_post_num is None:
last_read_post_num = 0
read_count = highest_post_num - last_read_post_num
data = {
"topic_id": topic_id,
"topic_time": 0
}
for i in range(last_read_post_num, highest_post_num + 1):
data["topic_time"] += read_time
data[f'timings[{i}]'] = data["topic_time"]
response = await self.topic.timings(data=data)
if response.status_code == 200:
return read_count
return 0
async def like_post(self, post_id):
response = await self.client.put(
f"/discourse-reactions/posts/{post_id}/custom-reactions/heart/toggle.json"
)
if response.status_code == 200:
self.log(f"Successfully liked post: {post_id}")
return 1
elif response.status_code == 429:
# self.states['liked_topics'] = 75
# now = datetime.datetime.now()
resp = response.json()
wait_seconds = resp["extras"]["wait_seconds"]
# target_time = now + datetime.timedelta(seconds=wait_seconds)
# self.states['liked_wait_until'] = target_time.isoformat()
self.log(f'当前点赞到上限,再等待{resp["extras"]["time_left"]}')
else:
self.log(f"Failed to like post: {post_id}")
self.log(response.status_code, response.text)
return 0
async def auto_read_posts(self, topics):
total_read_posts = 0
for topic in topics:
read_count = await self.topics_timings(
topic["id"],
topic["highest_post_number"],
topic.get("last_read_post_number", None)
)
total_read_posts += read_count
self.log(f"读取topic {topic['id']}, {read_count}条评论")
await asyncio.sleep(1)
self.log(f'成功标记 {total_read_posts} posts 为读取')
async def auto_like(self):
total_liked_topics = 0
topics = await self.topic.new()
for topic in topics:
if topic["unseen"]:
topic_info = await self.topic.detail(topic["id"])
post_stream = topic_info["post_stream"]
first_post = post_stream["posts"][0]
like_count = await self.like_post(first_post["id"])
total_liked_topics += like_count
await asyncio.sleep(1)
self.log(f"送出点赞:{total_liked_topics}个")
async def run(self, username=None):
# await self.login()
# await self.auto_read_posts(await self.topic.unread())
# await self.auto_read_posts(await self.topic.new())
# await self.auto_read_posts(await self.topic.top())
# await self.auto_read_posts(await self.topic.latest())
await self.auto_like()
if username:
await self.summary(username)
if __name__ == '__main__':
d = Discourse("linux.do")
asyncio.run(d.run())
评论