101 lines
3.1 KiB
Python
101 lines
3.1 KiB
Python
import pymongo
|
|
|
|
from json import loads
|
|
from bson import ObjectId, json_util
|
|
from pydantic import BaseModel
|
|
from api_configs import MongoConfig
|
|
|
|
from pymongo import MongoClient
|
|
from pymongo.collection import Collection
|
|
from pymongo.results import InsertManyResult
|
|
|
|
# from configs import TestMongo as MongoConfig
|
|
|
|
|
|
def parse_json(data):
|
|
return loads(json_util.dumps(data))
|
|
|
|
|
|
def create_database_client(url, database_name):
|
|
return MongoClient(url)[database_name]
|
|
|
|
|
|
class Paginate(BaseModel):
|
|
pageSize: int = 10
|
|
pageNumber: int = 1
|
|
sortField: str = "_id"
|
|
sortOrder: str = "desc"
|
|
|
|
def grab_paginates(self):
|
|
size_ = self.pageSize
|
|
return (
|
|
size_,
|
|
size_ * (self.pageNumber - 1),
|
|
self.sortField,
|
|
-1 if self.sortOrder == "desc" else 1,
|
|
)
|
|
|
|
|
|
class MongoQuery:
|
|
|
|
def __init__(self, table_name: str, database_name: str):
|
|
database = MongoClient(MongoConfig.url)[database_name]
|
|
# if table_name not in database.collection_names():
|
|
# database.create_collection(name=table_name)
|
|
self.table: Collection = database[table_name]
|
|
|
|
@staticmethod
|
|
def grab_paginates(paginate):
|
|
return (
|
|
paginate.size,
|
|
paginate.size * (paginate.page - 1),
|
|
paginate.order_field,
|
|
-1 if paginate.order_type == "desc" else 1,
|
|
)
|
|
|
|
@staticmethod
|
|
def parse_json(data):
|
|
return loads(json_util.dumps(data))
|
|
|
|
def insert(self, payload) -> InsertManyResult:
|
|
return self.table.insert_many(documents=[payload])
|
|
|
|
def update(self, match, payload, field: str = "id"):
|
|
if field == "id":
|
|
filter_ = {"_id": ObjectId(match)}
|
|
self.table.update_one(filter=filter_, update={"$set": payload})
|
|
self.table.update_one(filter={field: match}, update={"$set": payload})
|
|
|
|
def get_one(self, match, field: str = "id"):
|
|
if field == "id":
|
|
return self.parse_json(
|
|
data=self.table.find_one(filter={"_id": ObjectId(match)})
|
|
)
|
|
return self.parse_json(data=self.table.find_one(filter={field: match}))
|
|
|
|
def filter_by(self, payload, sort_by: str = "_id", sort_direction: str = "asc"):
|
|
sort_direction = (
|
|
pymongo.ASCENDING
|
|
if str(sort_direction).lower() == "asc"
|
|
else pymongo.DESCENDING
|
|
)
|
|
return_ = self.table.find(payload).sort(sort_by, sort_direction)
|
|
return self.parse_json(data=return_)
|
|
|
|
def delete_one(self, match, field: str = "id"):
|
|
if field == "id":
|
|
self.table.delete_one(filter={"_id": ObjectId(match)})
|
|
self.table.delete_one(filter={field: match})
|
|
|
|
def list_all(self, paginate: Paginate):
|
|
size, skip, field, order = paginate.grab_paginates()
|
|
return_ = self.table.find().sort([(field, order)]).skip(skip).limit(size)
|
|
return self.table.count_documents({}), self.parse_json(data=return_)
|
|
|
|
def get_all(self):
|
|
return_ = self.table.find()
|
|
return self.table.count_documents({}), self.parse_json(data=return_)
|
|
|
|
|
|
# Mongo = MongoQuery(table_name="XcompanyConfig", database_name="mongo_database")
|