Source code for db.dbConnection

import logging
import uuid

database=""

DEBUG_T = "[DbConnection] "


[docs]class DbConnectionClass: '''Class containing methods for DB management Methods ------- verify identity is_admin check_if_email_exists create_user authenticate get_users_available_uses reduce_uses save_request ''' def __init__(self, dbi, config): global database database = dbi self.__app_config = config self.__db_URI = config["SQLALCHEMY_DATABASE_URI"]
[docs] def verify(self, email, password): '''Verifies the credentials. Parameters ---------- email: str Users email. password: str Users password. Returns ------- db.dbModels.User or None ''' if not (email and password): return False return self.authenticate(email, password)
# def close(self): # self.__connection.close()
[docs] def identity(self, payload): '''Returns an object with user_id field. Parameters ---------- payload: list Returns ------- dict ''' user_id = payload['identity'] return {"user_id": user_id}
[docs] def is_admin(self, identity): '''Returns 1 if the user with given id has administrative rights. Parameters ---------- identity: int Users ID. Returns ------- bool ''' user = User.query.filter_by(id=identity).first() isa = bool(user.is_admin) return isa
[docs] def check_if_email_exists(self, email): '''Returns true if email already exists in DB. Parameters ---------- email: str Email to check. Returns ------- bool ''' result= User.query.filter_by(email=email).first() if result: return True else: return False
[docs] def create_user(self, email, uses, is_admin): '''Creates a new user. Parameters ---------- email: str Users email. uses: int Users uses. is_admin: int Bool cast. 1 if the user has to be admin. ''' password = str(uuid.uuid4()).replace("-","") user = User(email = email, password = password, uses = uses, is_admin = is_admin) database.session.add(user) database.session.commit() return password
[docs] def authenticate(self, email, password): '''Returns User object if proper credentials. Otherwise returns None. Parameters ---------- email: str Users email. password: str Users password. ''' result= User.query.filter_by(email=email).first() if bcrypt.check_password_hash(result.password, password): return result else: return None
[docs] def get_users_available_uses(self, id): '''Returns amount of available POST requests for given user's id. Parameters ---------- id: int Users ID. Returns ------- int ''' result = User.query.filter_by(id=id).first() return result.get_uses()
[docs] def reduce_uses(self, id): '''Reduces available user's uses by 1. Parameters ---------- id: int Users ID. Returns ------- int ''' result = User.query.filter_by(id=id).first() return result.subtract_use()
[docs] def save_request(self, request_type, request_url, response, user_id, headers, is_xhr, data_type, data): '''Saves request details in DB. Parameters ---------- request_type: str Request type (POST, PUT etc.) request_url: str Requests URL e.g http://localhost:8000/v2/test1 response: str Models response to the request user_id: int ID of user that sent request headers: str Headers sent with the request is_xhr: bool Was the request a XHR request. data_type: char Type of data sent 'I' - image etc. data: str Path to saved image or text. ''' request_url = (request_url[:64] + '..') if len(request_url) > 64 else request_url response = (response[:1024] + '..') if len(response) > 1024 else response headers = (headers[:1024] + '..') if len(headers) > 1024 else headers data = (data[:1024] + '..') if len(data) > 1024 else data request_entry = Request( request_type=request_type, request_url=request_url, response=response, user_id=user_id, headers=headers, is_xhr=is_xhr, data_type=data_type, data=data) database.session.add(request_entry) database.session.commit()
from mlapi.app import bcrypt from db.dbModels import User from db.dbModels import Request