added structured api endpoints and done some refactoring
Some checks failed
Python formatting PEP8 / Python-PEP8 (push) Failing after 18s

This commit is contained in:
Finn Christiansen 2023-08-12 00:20:51 +02:00
parent a689b1c56d
commit d8607212da
15 changed files with 362 additions and 57 deletions

0
app/api/__init__.py Normal file
View file

6
app/api/api.py Normal file
View file

@ -0,0 +1,6 @@
from fastapi import APIRouter
from .endpoints import items
api_router = APIRouter()
api_router.include_router(items.router, prefix="/items", tags=["items"])

11
app/api/deps.py Normal file
View file

@ -0,0 +1,11 @@
from typing import Generator
from db.database import SessionLocal
def get_db() -> Generator:
try:
db = SessionLocal()
yield db
finally:
db.close()

View file

View file

@ -0,0 +1,56 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
import crud
import models
from schemas.item import Item, ItemCreate, ItemUpdate
from api import deps
router = APIRouter()
@router.get("/{item_id}", response_model=Item)
async def get_item(
*,
db: Session = Depends(deps.get_db),
item_id: int
):
return crud.item.get(db, item_id)
@router.get("/", response_model=list[Item])
async def get_items(
skip: int = 0,
limit: int = 100,
db: Session = Depends(deps.get_db)
):
return crud.item.get_multi(db, skip=skip, limit=limit)
@router.post("/", response_model=Item)
async def create_item(
item: ItemCreate,
db: Session = Depends(deps.get_db)
):
return crud.item.create(db=db, item=item)
@router.put("/{item_id}", response_model=ItemUpdate)
async def update_item(
item: ItemUpdate,
db: Session = Depends(deps.get_db)
):
return crud.item.update(db=db, item=item)
@router.delete("/{item_id}")
async def delete_item(
*,
item: ItemUpdate,
db: Session = Depends(deps.get_db),
item_id: int
):
return crud.remove(item_id)

View file

@ -0,0 +1,96 @@
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Body, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
from app.core import security
from app.core.config import settings
from app.core.security import get_password_hash
from app.utils import (
generate_password_reset_token,
send_reset_password_email,
verify_password_reset_token,
)
router = APIRouter()
@router.post("/login/access-token", response_model=schemas.Token)
def login_access_token(
db: Session = Depends(deps.get_db), form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests
"""
user = crud.user.authenticate(
db, email=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(status_code=400, detail="Incorrect email or password")
elif not crud.user.is_active(user):
raise HTTPException(status_code=400, detail="Inactive user")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": security.create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.post("/login/test-token", response_model=schemas.User)
def test_token(current_user: models.User = Depends(deps.get_current_user)) -> Any:
"""
Test access token
"""
return current_user
@router.post("/password-recovery/{email}", response_model=schemas.Msg)
def recover_password(email: str, db: Session = Depends(deps.get_db)) -> Any:
"""
Password Recovery
"""
user = crud.user.get_by_email(db, email=email)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system.",
)
password_reset_token = generate_password_reset_token(email=email)
send_reset_password_email(
email_to=user.email, email=email, token=password_reset_token
)
return {"msg": "Password recovery email sent"}
@router.post("/reset-password/", response_model=schemas.Msg)
def reset_password(
token: str = Body(...),
new_password: str = Body(...),
db: Session = Depends(deps.get_db),
) -> Any:
"""
Reset password
"""
email = verify_password_reset_token(token)
if not email:
raise HTTPException(status_code=400, detail="Invalid token")
user = crud.user.get_by_email(db, email=email)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system.",
)
elif not crud.user.is_active(user):
raise HTTPException(status_code=400, detail="Inactive user")
hashed_password = get_password_hash(new_password)
user.hashed_password = hashed_password
db.add(user)
db.commit()
return {"msg": "Password updated successfully"}

153
app/api/endpoints/users.py Normal file
View file

@ -0,0 +1,153 @@
from typing import Any, List
from fastapi import APIRouter, Body, Depends, HTTPException
from fastapi.encoders import jsonable_encoder
from pydantic.networks import EmailStr
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
from app.core.config import settings
from app.utils import send_new_account_email
router = APIRouter()
@router.get("/", response_model=List[schemas.User])
def read_users(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Retrieve users.
"""
users = crud.user.get_multi(db, skip=skip, limit=limit)
return users
@router.post("/", response_model=schemas.User)
def create_user(
*,
db: Session = Depends(deps.get_db),
user_in: schemas.UserCreate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Create new user.
"""
user = crud.user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
detail="The user with this username already exists in the system.",
)
user = crud.user.create(db, obj_in=user_in)
if settings.EMAILS_ENABLED and user_in.email:
send_new_account_email(
email_to=user_in.email, username=user_in.email, password=user_in.password
)
return user
@router.put("/me", response_model=schemas.User)
def update_user_me(
*,
db: Session = Depends(deps.get_db),
password: str = Body(None),
full_name: str = Body(None),
email: EmailStr = Body(None),
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update own user.
"""
current_user_data = jsonable_encoder(current_user)
user_in = schemas.UserUpdate(**current_user_data)
if password is not None:
user_in.password = password
if full_name is not None:
user_in.full_name = full_name
if email is not None:
user_in.email = email
user = crud.user.update(db, db_obj=current_user, obj_in=user_in)
return user
@router.get("/me", response_model=schemas.User)
def read_user_me(
db: Session = Depends(deps.get_db),
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get current user.
"""
return current_user
@router.post("/open", response_model=schemas.User)
def create_user_open(
*,
db: Session = Depends(deps.get_db),
password: str = Body(...),
email: EmailStr = Body(...),
full_name: str = Body(None),
) -> Any:
"""
Create new user without the need to be logged in.
"""
if not settings.USERS_OPEN_REGISTRATION:
raise HTTPException(
status_code=403,
detail="Open user registration is forbidden on this server",
)
user = crud.user.get_by_email(db, email=email)
if user:
raise HTTPException(
status_code=400,
detail="The user with this username already exists in the system",
)
user_in = schemas.UserCreate(password=password, email=email, full_name=full_name)
user = crud.user.create(db, obj_in=user_in)
return user
@router.get("/{user_id}", response_model=schemas.User)
def read_user_by_id(
user_id: int,
current_user: models.User = Depends(deps.get_current_active_user),
db: Session = Depends(deps.get_db),
) -> Any:
"""
Get a specific user by id.
"""
user = crud.user.get(db, id=user_id)
if user == current_user:
return user
if not crud.user.is_superuser(current_user):
raise HTTPException(
status_code=400, detail="The user doesn't have enough privileges"
)
return user
@router.put("/{user_id}", response_model=schemas.User)
def update_user(
*,
db: Session = Depends(deps.get_db),
user_id: int,
user_in: schemas.UserUpdate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Update a user.
"""
user = crud.user.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system",
)
user = crud.user.update(db, db_obj=user, obj_in=user_in)
return user

View file

@ -0,0 +1,35 @@
from typing import Any
from fastapi import APIRouter, Depends
from pydantic.networks import EmailStr
from app import models, schemas
from app.api import deps
from app.core.celery_app import celery_app
from app.utils import send_test_email
router = APIRouter()
@router.post("/test-celery/", response_model=schemas.Msg, status_code=201)
def test_celery(
msg: schemas.Msg,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Test Celery worker.
"""
celery_app.send_task("app.worker.test_celery", args=[msg.msg])
return {"msg": "Word received"}
@router.post("/test-email/", response_model=schemas.Msg, status_code=201)
def test_email(
email_to: EmailStr,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Test emails.
"""
send_test_email(email_to=email_to)
return {"msg": "Test email sent"}

View file

@ -5,7 +5,7 @@ from pydantic import BaseModel
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from fastapi import HTTPException from fastapi import HTTPException
from database import Base from db.database import Base
ModelType = TypeVar("ModelType", bound=Base) ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel) CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)

View file

@ -1,5 +1,5 @@
from models import Item from models import Item
from schemas.schemas import ItemCreate, ItemUpdate from schemas.item import ItemCreate, ItemUpdate
from crud.base import CRUDBase from crud.base import CRUDBase

0
app/db/__init__.py Normal file
View file

View file

@ -1,9 +1,8 @@
from fastapi import Depends, FastAPI from fastapi import Depends, FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from schemas import schemas
import database
import crud import crud
from api.api import api_router
app = FastAPI() app = FastAPI()
app.add_middleware( app.add_middleware(
@ -11,60 +10,9 @@ app.add_middleware(
allow_origins=["*"] allow_origins=["*"]
) )
app.include_router(api_router)
# Dependency
def get_db():
db = database.SessionLocal()
try:
yield db
finally:
db.close()
@app.get("/") @app.get("/")
async def root(): async def root():
return {"message": "Hello World"} return {"message": "Hello World"}
@app.get("/items/{item_id}", response_model=schemas.Item)
async def get_item(
*,
db: Session = Depends(get_db),
item_id: int
):
return crud.item.get(db, item_id)
@app.get("/items/", response_model=list[schemas.Item])
async def get_items(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
return crud.item.get_multi(db, skip=skip, limit=limit)
@app.post("/items/", response_model=schemas.Item)
async def create_item(
item: schemas.ItemCreate,
db: Session = Depends(get_db)
):
return crud.item.create(db=db, item=item)
@app.put("/items/{item_id}", response_model=schemas.ItemUpdate)
async def update_item(
item: schemas.ItemUpdate,
db: Session = Depends(get_db)
):
return crud.item.update(db=db, item=item)
@app.delete("/items/{item_id}")
async def delete_item(
*,
item: schemas.ItemUpdate,
db: Session = Depends(get_db),
item_id: int
):
return crud.remove(item_id)

View file

@ -1,6 +1,6 @@
from sqlalchemy import Column, Integer, String from sqlalchemy import Column, Integer, String
from database import Base from db.database import Base
class Item(Base): class Item(Base):