125 lines
5.1 KiB
Python
125 lines
5.1 KiB
Python
import random
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
from sqlalchemy import String
|
|
from sqlalchemy.orm import mapped_column, Mapped
|
|
|
|
from cryptography.fernet import Fernet, MultiFernet
|
|
from databases.sql_models.core_mixin import CrudCollection
|
|
|
|
|
|
class CrypterEngine(CrudCollection):
|
|
|
|
__tablename__ = "crypter_engine"
|
|
__table_args__ = ()
|
|
encrypt_list = []
|
|
decrypt_list = []
|
|
keys_error = "Unable to retrieve encrypt keys"
|
|
alchemy_error = "Alchemy object is empty"
|
|
|
|
key_first: Mapped[str] = mapped_column(String, nullable=False)
|
|
key_second: Mapped[str] = mapped_column(String, nullable=False)
|
|
|
|
@classmethod
|
|
def get_valid_keys(cls, row=None):
|
|
cls.encrypt_list, cls.decrypt_list = [], []
|
|
if not cls.filter_all(cls.created_at > datetime.now() - timedelta(days=29)).get(
|
|
1
|
|
):
|
|
cls.create_encrypt_keys(count=100)
|
|
if decrypt_identifier := getattr(row, "cryp_uu_id", None):
|
|
if decrypt_row := cls.find_one(uu_id=str(decrypt_identifier)):
|
|
return (
|
|
decrypt_row.key_first.decode(),
|
|
decrypt_row.key_second.decode(),
|
|
decrypt_row.uu_id,
|
|
)
|
|
if encrypt_rows := cls.filter_all(
|
|
cls.created_at > datetime.now() - timedelta(days=29)
|
|
).data:
|
|
encrypt_row = random.choice(encrypt_rows)
|
|
return (
|
|
encrypt_row.key_first.encode(),
|
|
encrypt_row.key_second.encode(),
|
|
encrypt_rows.uu_id,
|
|
)
|
|
return None, None, None
|
|
|
|
@classmethod
|
|
def create_encrypt_keys(cls, count: int):
|
|
for _ in range(count):
|
|
key_first = Fernet.generate_key()
|
|
key_second = Fernet.generate_key()
|
|
cls.find_or_create(
|
|
key_first=key_first.decode(), key_second=key_second.decode()
|
|
)
|
|
|
|
@classmethod
|
|
def raise_exception(cls, message=None):
|
|
raise Exception(message if message else cls.keys_error)
|
|
|
|
@classmethod
|
|
def encrypt_given_alchemy_list(cls, alchemy_object_list: list):
|
|
for alchemy_object in alchemy_object_list:
|
|
key_first, key_second, cryp_uu_id = cls.get_valid_keys()
|
|
fernet_keys = MultiFernet([Fernet(key_first), Fernet(key_second)])
|
|
if not key_first or not key_second:
|
|
cls.raise_exception()
|
|
alchemy_dict = alchemy_object.get_dict() if alchemy_object else None
|
|
if not alchemy_dict:
|
|
cls.raise_exception(cls.alchemy_error)
|
|
for key, plain_row in alchemy_dict.items():
|
|
if key in alchemy_object.__encrypt_list__:
|
|
alchemy_dict[key] = fernet_keys.encrypt(plain_row).decode()
|
|
alchemy_dict["cryp_uu_id"] = cryp_uu_id
|
|
cls.encrypt_list.append(alchemy_object.update(**alchemy_dict))
|
|
return cls.encrypt_list
|
|
|
|
@classmethod
|
|
def encrypt_given_alchemy_object(cls, alchemy_object_object):
|
|
key_first, key_second, cryp_uu_id = cls.get_valid_keys()
|
|
fernet_keys = MultiFernet([Fernet(key_first), Fernet(key_second)])
|
|
if not key_first or not key_second:
|
|
cls.raise_exception()
|
|
alchemy_dict = (
|
|
alchemy_object_object.get_dict() if alchemy_object_object else None
|
|
)
|
|
if not alchemy_dict:
|
|
cls.raise_exception(cls.alchemy_error)
|
|
for key, plain_row in alchemy_dict.items():
|
|
if key in alchemy_object_object.__encrypt_list__:
|
|
alchemy_dict[key] = fernet_keys.encrypt(plain_row).decode()
|
|
alchemy_dict["cryp_uu_id"] = cryp_uu_id
|
|
return alchemy_object_object.update(**alchemy_dict)
|
|
|
|
@classmethod
|
|
def decrypt_given_alchemy(cls, alchemy_object_list: list):
|
|
for alchemy_object in alchemy_object_list:
|
|
key_first, key_second, cryp_uu_id = cls.get_valid_keys(row=alchemy_object)
|
|
fernet_keys = MultiFernet([Fernet(key_first), Fernet(key_second)])
|
|
if not key_first or not key_second:
|
|
cls.raise_exception()
|
|
alchemy_dict = alchemy_object.get_dict() if alchemy_object else None
|
|
if not alchemy_dict:
|
|
cls.raise_exception(cls.alchemy_error)
|
|
for key, plain_row in alchemy_dict.items():
|
|
if key in alchemy_object.__encrypt_list__:
|
|
alchemy_dict[key] = fernet_keys.decrypt(plain_row).decode()
|
|
cls.decrypt_list.append(alchemy_dict)
|
|
return cls.decrypt_list
|
|
|
|
@classmethod
|
|
def decrypt_given_alchemy_object(cls, alchemy_object):
|
|
key_first, key_second, cryp_uu_id = cls.get_valid_keys(row=alchemy_object)
|
|
fernet_keys = MultiFernet([Fernet(key_first), Fernet(key_second)])
|
|
if not key_first or not key_second:
|
|
cls.raise_exception()
|
|
alchemy_dict = alchemy_object.get_dict() if alchemy_object else None
|
|
if not alchemy_dict:
|
|
cls.raise_exception(cls.alchemy_error)
|
|
for key, plain_row in alchemy_dict.items():
|
|
if key in alchemy_object.__encrypt_list__:
|
|
alchemy_dict[key] = fernet_keys.decrypt(plain_row).decode()
|
|
return alchemy_dict
|