import pymongo from json import loads from bson import ObjectId, json_util from pydantic import BaseModel from api_configs import MongoConfig from pymongo import MongoClient from pymongo.collection import Collection from pymongo.results import InsertManyResult # from configs import TestMongo as MongoConfig def parse_json(data): return loads(json_util.dumps(data)) def create_database_client(url, database_name): return MongoClient(url)[database_name] class Paginate(BaseModel): pageSize: int = 10 pageNumber: int = 1 sortField: str = "_id" sortOrder: str = "desc" def grab_paginates(self): size_ = self.pageSize return ( size_, size_ * (self.pageNumber - 1), self.sortField, -1 if self.sortOrder == "desc" else 1, ) class MongoQuery: def __init__(self, table_name: str, database_name: str): database = MongoClient(MongoConfig.url)[database_name] if table_name not in database.collection_names(): database.create_collection(name=table_name) self.table: Collection = database[table_name] @staticmethod def grab_paginates(paginate): return ( paginate.size, paginate.size * (paginate.page - 1), paginate.order_field, -1 if paginate.order_type == "desc" else 1, ) @staticmethod def parse_json(data): return loads(json_util.dumps(data)) def insert(self, payload) -> InsertManyResult: return self.table.insert_many(documents=[payload]) def update(self, match, payload, field: str = "id"): if field == "id": filter_ = {"_id": ObjectId(match)} self.table.update_one(filter=filter_, update={"$set": payload}) self.table.update_one(filter={field: match}, update={"$set": payload}) def get_one(self, match, field: str = "id"): if field == "id": return self.parse_json( data=self.table.find_one(filter={"_id": ObjectId(match)}) ) return self.parse_json(data=self.table.find_one(filter={field: match})) def filter_by(self, payload, sort_by: str = "_id", sort_direction: str = "asc"): sort_direction = ( pymongo.ASCENDING if str(sort_direction).lower() == "asc" else pymongo.DESCENDING ) return_ = self.table.find(payload).sort(sort_by, sort_direction) return self.parse_json(data=return_) def delete_one(self, match, field: str = "id"): if field == "id": self.table.delete_one(filter={"_id": ObjectId(match)}) self.table.delete_one(filter={field: match}) def list_all(self, paginate: Paginate): size, skip, field, order = paginate.grab_paginates() return_ = self.table.find().sort([(field, order)]).skip(skip).limit(size) return self.table.count_documents({}), self.parse_json(data=return_) def get_all(self): return_ = self.table.find() return self.table.count_documents({}), self.parse_json(data=return_) # Mongo = MongoQuery(table_name="XcompanyConfig", database_name="mongo_database")