Github地址:https://github.com/sqwqwqw1/xxqg

查询学习强国答案的小程序,实际上不值一提,但是却是我第一次写出来的小玩意。

其实是偷的别人服务器的内容(http://www.syiban.com)。

核心程序是server里的query.py,实际上,只需要run_query(关键词)就可以得到答案了。学了两天fastapi,感觉好像可以写成个接口试试,就写了。

运行sever里的api.py,就可以调用http://127.0.0.1:8000/xxqg?key=keyword(本机或者服务器ip),返回结果,GET或POST请求均可。

使用了jwt验证,只有一个用户,用户名user01,密码password。

也可以不自己写代码调用接口,直接运行client里面的xxqg.py。

目录结构
server
|--query.py   #查询程序
|--api.py     #定义接口
client
|--xxqg.py    #调用接口实现查询

query.py

import requests, re
from bs4 import BeautifulSoup

def query(key):
    url = 'http://www.syiban.com/search/index/init.html?modelid=1&q='
    headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36',
        'Referer': 'http://www.syiban.com/',
    }
    r = requests.get('{}{}'.format(url,key),headers=headers)
    soup = BeautifulSoup(r.text,'lxml')
    li = soup.find_all('div','yzm-news-right')
    answer_li = []
    for i in li:
        question = i.find('span','title_color').text
        question = re.sub(r'\u200b', '', question)
        answer = re.sub(r'\u200b|答案:|A、|B、|C、|D、|E、|F、|A\.|B\.|C\.|D\.|E\.|F\.|\s', '', i.find_all('span')[-1].text)
        item = {'question':question,
               'answer':answer}
        answer_li.append(item)
    return answer_li

def run_query(q):
    try:
        if len(query(q))!=0:
            data = {'status':'success','data': query(q)}
            data.update({'total':len(data['data'])})
        else:
            data = {'status':'failed','error': '查询结果为空!'}
    except:
        data = {'status':'failed','error': '查询失败,没有相应结果!'}
    return data

api.py

from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
from fastapi import FastAPI, Form, Depends, HTTPException, status
from fastapi.responses import JSONResponse
from passlib.context import CryptContext
from datetime import timedelta, datetime
from jose import JWTError, jwt
from pydantic import BaseModel
from query import run_query
from typing import Union
import uvicorn

app = FastAPI()
# app = FastAPI(docs_url=None, redoc_url=None)

# OAuth2加密
oauth2_schema = OAuth2PasswordBearer(tokenUrl='/token')
# 密码用一次性md5加密
pwd_context = CryptContext(schemes=["md5_crypt"], deprecated="auto")
# jwt用这个密钥、这个算法加密,密钥可以自己随便生成
SECRET_KEY = "fdc891e49e9f4d526b8fdd7d53bf0a8dd40035de30e7e9c3db4d1035e2e05d60"
ALGORITHM = "HS256"
# 过期时间30分钟
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 一个虚假的用户表(字典)
users_db = {
    'user01':{
        'username': 'user01',
        'password': '$1$Nyu4AoP1$trb8LJzUXrbroQB60Ha51.', #对应的明文密码是password
    }
}

# Token类
class Token(BaseModel):
    access_token:str
    token_type:str

# 用户类
class UserInDB(BaseModel):
    username:str
    password:str

# 校验密码是否与密文一致
def verify_password(plain_password:str, hashed_password:str):
    return pwd_context.verify(plain_password, hashed_password)

# 获取用户字典
def get_user(users_db, username:str):
    if username in users_db:
        user_dict = users_db[username]
        return UserInDB(**user_dict)

# 校验用户是否在表中、密码是否一致
def authenticate_user(users_db, username, password):
    user = get_user(users_db, username)
    if not user:
        return False
    if not verify_password(plain_password=password, hashed_password=user.password):
        return False
    return user

# 生成token
def generate_token(data:dict, expires_delta:Union[timedelta, None] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

# 创建返回token的接口
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = generate_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

# 创建一个get查询接口
@app.get('/xxqg')
async def query_qg(key:str, token:str = Depends(oauth2_schema)):
    return JSONResponse(run_query(key),headers={'content-type': 'application/json;charset=utf-8'})

# 创建一个post查询接口
@app.post('/xxqg')
async def query_qg(key:str = Form(...), token:str = Depends(oauth2_schema)):
    return JSONResponse(run_query(key),headers={'content-type': 'application/json;charset=utf-8'})

if __name__ == '__main__':
    uvicorn.run('api:app', host='localhost', port=8000, reload=False, workers=1)

xxqg.py

import requests
import os

os.system('clear')

token_url = 'http://localhost:8000/token'
user_dict = {
    'username':'user01',
    'password':'password',
}
r = requests.post(token_url, user_dict)
headers={"Authorization": "Bearer " + r.json()['access_token']}

url = 'http://localhost:8000/xxqg'
# 死循环永远执行
while 1:
    q = input("请输入查询字符:")
    if q=='': 
        os.system('clear') #清空屏幕
        continue
    os.system('clear')  #清空屏幕
    data = {
        'key': q,
    }
    r = requests.post(url, data=data, headers=headers)
    if r.json()['status']=='success':
        li = r.json()['data']

        for i in li:
            print(f"题目:{i['question']}\n答案:{i['answer']}\n\n")
    else:
        print(r.json())

话说,实际上只要在客户端运行query就可以了,真是多此一举啊……