wag-managment-api-service-v.../a_project_files/later_use_codes/joined_selection.py

112 lines
4.1 KiB
Python

@classmethod
def select_action(cls, duty_id, filter_expr: list = None):
from application.base import AlchemyResponse
keys_list = list(cls.__table__.columns) + list(
RelationshipDutyUser.__table__.columns
)
keys = [str(_.table) + "*" + str(_.name) for _ in keys_list]
data_list = []
if filter_expr is not None:
filter_expr = (RelationshipDutyUser.duty_id == duty_id, *filter_expr)
data = (
cls.session.query(
*cls.__table__.columns, *RelationshipDutyUser.__table__.columns
)
.select_from(cls)
.join(RelationshipDutyUser, RelationshipDutyUser.member_id == cls.id)
.filter(*filter_expr)
)
return AlchemyResponse(
query=data, data=data_list if data_list else data.all(), count=data.count()
)
data = (
cls.session.query(
*cls.__table__.columns, *RelationshipDutyUser.__table__.columns
)
.select_from(cls)
.join(RelationshipDutyUser, RelationshipDutyUser.member_id == cls.id)
.filter(RelationshipDutyUser.duty_id == duty_id)
)
for _ in data.all():
temp_dict = {}
for ix, item in enumerate(_):
table, key = str(keys[ix]).split("*")[0], str(keys[ix]).split("*")[1]
if table in temp_dict:
temp_dict[table][key] = item
else:
temp_dict[table] = {}
temp_dict[table][key] = item
data_list.append(temp_dict)
return AlchemyResponse(
query=data, data=data_list if data_list else data.all(), count=data.count()
)
def create_filter_with_options(model, options: ListOptions, duty_id):
from application.base import AlchemyResponse
if options.query:
model.query_filter = options.query
model.filter_attr = options
if hasattr(model, "select_action"):
list_of_model_records = model.select_action(duty_id=duty_id)
else:
list_of_model_records = model.filter_by_non_deleted()
if hasattr(list_of_model_records.data, "get_dict"):
if not list_of_model_records.count:
return AlchemyResponse(
query=list_of_model_records.query,
count=list_of_model_records.count,
data=[],
)
return AlchemyResponse(
query=list_of_model_records.query,
count=list_of_model_records.count,
data=[_.get_dict() for _ in list_of_model_records.data],
)
if not list_of_model_records.count:
return AlchemyResponse(
query=list_of_model_records.query,
count=list_of_model_records.count,
data=[],
)
return AlchemyResponse(
query=list_of_model_records.query,
count=list_of_model_records.count,
data=model.joined_to_dict(list_of_model_records.data),
)
@classmethod
def joined_to_dict(cls, joined_list: list):
exclude = list(cls.__exclude__fields__) + [
"is_confirmed",
"deleted",
"replication_id",
"id",
]
serialized_list = []
new_dict = {}
for joined_dict_list in joined_list:
for joined_dict in joined_dict_list:
return_dict = {}
for key, val in joined_dict_list[joined_dict].items():
if key not in exclude and "_id" not in key:
if val is None:
return_dict[key] = None
elif key == "active":
return_dict[key] = bool(val)
elif isinstance(val, int) or isinstance(val, float):
return_dict[key] = float(val)
elif isinstance(val, bool):
return_dict[key] = bool(val)
else:
return_dict[key] = str(val)
if key == "uu_id":
return_dict["uu_id"] = str(val)
new_dict[joined_dict] = return_dict
serialized_list.append(new_dict)
return serialized_list