Github地址:https://github.com/sqwqwqw1/xxqg
查询学习强国答案的小程序,实际上不值一提,但是却是我第一次写出来的小玩意。
其实是偷的别人服务器的内容(http://www.syiban.com)。
核心程序是server里的query.py,实际上,只需要run_query(关键词)就可以得到答案了。学了两天fastapi,感觉好像可以写成个接口试试,就写了。
运行sever里的api.py,就可以调用http://127.0.0.1:8000/xxqg?key=keyword(本机或者服务器ip),返回结果,GET或POST请求均可。
使用了jwt验证,只有一个用户,用户名user01,密码password。
也可以不自己写代码调用接口,直接运行client里面的xxqg.py。
目录结构 server |--query.py #查询程序 |--api.py #定义接口 client |--xxqg.py #调用接口实现查询
query.py
import requests, re from bs4 import BeautifulSoup def query(key): url = 'http://www.syiban.com/search/index/init.html?modelid=1&q=' headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36', 'Referer': 'http://www.syiban.com/', } r = requests.get('{}{}'.format(url,key),headers=headers) soup = BeautifulSoup(r.text,'lxml') li = soup.find_all('div','yzm-news-right') answer_li = [] for i in li: question = i.find('span','title_color').text question = re.sub(r'\u200b', '', question) answer = re.sub(r'\u200b|答案:|A、|B、|C、|D、|E、|F、|A\.|B\.|C\.|D\.|E\.|F\.|\s', '', i.find_all('span')[-1].text) item = {'question':question, 'answer':answer} answer_li.append(item) return answer_li def run_query(q): try: if len(query(q))!=0: data = {'status':'success','data': query(q)} data.update({'total':len(data['data'])}) else: data = {'status':'failed','error': '查询结果为空!'} except: data = {'status':'failed','error': '查询失败,没有相应结果!'} return data
api.py
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer from fastapi import FastAPI, Form, Depends, HTTPException, status from fastapi.responses import JSONResponse from passlib.context import CryptContext from datetime import timedelta, datetime from jose import JWTError, jwt from pydantic import BaseModel from query import run_query from typing import Union import uvicorn app = FastAPI() # app = FastAPI(docs_url=None, redoc_url=None) # OAuth2加密 oauth2_schema = OAuth2PasswordBearer(tokenUrl='/token') # 密码用一次性md5加密 pwd_context = CryptContext(schemes=["md5_crypt"], deprecated="auto") # jwt用这个密钥、这个算法加密,密钥可以自己随便生成 SECRET_KEY = "fdc891e49e9f4d526b8fdd7d53bf0a8dd40035de30e7e9c3db4d1035e2e05d60" ALGORITHM = "HS256" # 过期时间30分钟 ACCESS_TOKEN_EXPIRE_MINUTES = 30 # 一个虚假的用户表(字典) users_db = { 'user01':{ 'username': 'user01', 'password': '$1$Nyu4AoP1$trb8LJzUXrbroQB60Ha51.', #对应的明文密码是password } } # Token类 class Token(BaseModel): access_token:str token_type:str # 用户类 class UserInDB(BaseModel): username:str password:str # 校验密码是否与密文一致 def verify_password(plain_password:str, hashed_password:str): return pwd_context.verify(plain_password, hashed_password) # 获取用户字典 def get_user(users_db, username:str): if username in users_db: user_dict = users_db[username] return UserInDB(**user_dict) # 校验用户是否在表中、密码是否一致 def authenticate_user(users_db, username, password): user = get_user(users_db, username) if not user: return False if not verify_password(plain_password=password, hashed_password=user.password): return False return user # 生成token def generate_token(data:dict, expires_delta:Union[timedelta, None] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt # 创建返回token的接口 @app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): user = authenticate_user(users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = generate_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} # 创建一个get查询接口 @app.get('/xxqg') async def query_qg(key:str, token:str = Depends(oauth2_schema)): return JSONResponse(run_query(key),headers={'content-type': 'application/json;charset=utf-8'}) # 创建一个post查询接口 @app.post('/xxqg') async def query_qg(key:str = Form(...), token:str = Depends(oauth2_schema)): return JSONResponse(run_query(key),headers={'content-type': 'application/json;charset=utf-8'}) if __name__ == '__main__': uvicorn.run('api:app', host='localhost', port=8000, reload=False, workers=1)
xxqg.py
import requests import os os.system('clear') token_url = 'http://localhost:8000/token' user_dict = { 'username':'user01', 'password':'password', } r = requests.post(token_url, user_dict) headers={"Authorization": "Bearer " + r.json()['access_token']} url = 'http://localhost:8000/xxqg' # 死循环永远执行 while 1: q = input("请输入查询字符:") if q=='': os.system('clear') #清空屏幕 continue os.system('clear') #清空屏幕 data = { 'key': q, } r = requests.post(url, data=data, headers=headers) if r.json()['status']=='success': li = r.json()['data'] for i in li: print(f"题目:{i['question']}\n答案:{i['answer']}\n\n") else: print(r.json())
话说,实际上只要在客户端运行query就可以了,真是多此一举啊……
Comments | NOTHING