updated Service Runner email Reader
This commit is contained in:
parent
a830cc079d
commit
81184a8acc
Binary file not shown.
|
|
@ -0,0 +1,240 @@
|
||||||
|
import os
|
||||||
|
|
||||||
|
from redis import Redis
|
||||||
|
from json import dumps, loads
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import List, Dict, Any, Union
|
||||||
|
from email import policy
|
||||||
|
from email.message import EmailMessage
|
||||||
|
from email.headerregistry import UniqueDateHeader, UniqueAddressHeader, UniqueUnstructuredHeader
|
||||||
|
from email.parser import BytesParser
|
||||||
|
from imaplib import IMAP4_SSL
|
||||||
|
from config import EmailConfig, Config
|
||||||
|
|
||||||
|
|
||||||
|
email_config = EmailConfig()
|
||||||
|
config = Config()
|
||||||
|
redis_client = Redis(
|
||||||
|
host='10.10.2.15',
|
||||||
|
password='your_strong_password_here',
|
||||||
|
port=6379,
|
||||||
|
db=0
|
||||||
|
)
|
||||||
|
|
||||||
|
class Mails:
|
||||||
|
|
||||||
|
def __init__(self, mail_id: bytes, mail_data: bytes):
|
||||||
|
self.id: bytes = mail_id
|
||||||
|
self.raw_data: bytes = mail_data
|
||||||
|
self.attachments: List[Dict[str, Union[str, bytes]]] = []
|
||||||
|
self.message: EmailMessage = BytesParser(policy=policy.default).parsebytes(mail_data)
|
||||||
|
self.subject: UniqueUnstructuredHeader = self.message.get('Subject', '') or ''
|
||||||
|
self.from_: UniqueAddressHeader = self.message.get('From', '') or ''
|
||||||
|
self.to: UniqueAddressHeader = self.message.get('To', '') or ''
|
||||||
|
self.date: UniqueDateHeader = self.message.get('Date', '') or ''
|
||||||
|
self.body_text: str = self._get_body_text()
|
||||||
|
self._extract_attachments()
|
||||||
|
|
||||||
|
|
||||||
|
def to_dict(self) -> Dict[str, Any]:
|
||||||
|
return {
|
||||||
|
'id': self.id.decode('utf-8'),
|
||||||
|
# 'raw_data': self.raw_data.decode('utf-8'),
|
||||||
|
'attachments': [{
|
||||||
|
'filename': attachment['filename'],
|
||||||
|
'content_type': attachment['content_type'],
|
||||||
|
'charset': attachment['charset'],
|
||||||
|
'data': attachment['data'].decode(attachment['charset'], errors='replace')
|
||||||
|
} for attachment in self.attachments],
|
||||||
|
# 'message': self.message.as_string(),
|
||||||
|
'subject': str(self.subject),
|
||||||
|
'from_': {
|
||||||
|
"display_name": self.from_.addresses[0].display_name,
|
||||||
|
"username": self.from_.addresses[0].username,
|
||||||
|
"domain": self.from_.addresses[0].domain,
|
||||||
|
"mail": f"{self.from_.addresses[0].username}@{self.from_.addresses[0].domain}"
|
||||||
|
},
|
||||||
|
'to': [
|
||||||
|
{
|
||||||
|
"display_name": address.display_name,
|
||||||
|
"username": address.username,
|
||||||
|
"domain": address.domain,
|
||||||
|
"mail": f"{address.username}@{address.domain}"
|
||||||
|
} for address in self.to.addresses
|
||||||
|
],
|
||||||
|
'date': str(self.date.datetime),
|
||||||
|
'body_text': str(self.body_text)
|
||||||
|
}
|
||||||
|
|
||||||
|
def _get_body_text(self) -> str:
|
||||||
|
body = self.message.get_body(preferencelist=('plain',))
|
||||||
|
if body is not None:
|
||||||
|
return body.get_content() or ''
|
||||||
|
if self.message.is_multipart():
|
||||||
|
for part in self.message.walk():
|
||||||
|
if part.get_content_type() == 'text/plain' and (part.get_content_disposition() or '') != 'attachment':
|
||||||
|
try:
|
||||||
|
return part.get_content() or ''
|
||||||
|
except Exception:
|
||||||
|
payload = part.get_payload(decode=True) or b''
|
||||||
|
return payload.decode(part.get_content_charset() or 'utf-8', errors='replace')
|
||||||
|
else:
|
||||||
|
if self.message.get_content_type() == 'text/plain':
|
||||||
|
try:
|
||||||
|
return self.message.get_content() or ''
|
||||||
|
except Exception:
|
||||||
|
payload = self.message.get_payload(decode=True) or b''
|
||||||
|
return payload.decode(self.message.get_content_charset() or 'utf-8', errors='replace')
|
||||||
|
return ''
|
||||||
|
|
||||||
|
def _extract_attachments(self) -> None:
|
||||||
|
for part in self.message.walk():
|
||||||
|
if part.get_content_disposition() == 'attachment':
|
||||||
|
filename = part.get_filename()
|
||||||
|
if not filename:
|
||||||
|
continue
|
||||||
|
data = part.get_payload(decode=True) or b''
|
||||||
|
charset = part.get_charset() or 'utf-8'
|
||||||
|
self.attachments.append(
|
||||||
|
{'filename': filename, 'content_type': part.get_content_type(), 'data': data, 'charset': charset}
|
||||||
|
)
|
||||||
|
|
||||||
|
def save_attachments(self, folder: str) -> None:
|
||||||
|
os.makedirs(folder, exist_ok=True)
|
||||||
|
for att in self.attachments:
|
||||||
|
with open(os.path.join(folder, att['filename']), 'wb') as f:
|
||||||
|
f.write(att['data'])
|
||||||
|
|
||||||
|
|
||||||
|
class EmailReaderIsbankService:
|
||||||
|
|
||||||
|
NO_ATTACHMENT_FOLDER = "NoAttachment"
|
||||||
|
COMPLETED_FOLDER = "Completed"
|
||||||
|
|
||||||
|
def __init__(self, email_config: EmailConfig, config: Config):
|
||||||
|
self.email_config = email_config
|
||||||
|
self.config = config
|
||||||
|
self.mail = IMAP4_SSL(email_config.EMAIL_HOST, email_config.EMAIL_PORT)
|
||||||
|
self.mail.login(email_config.EMAIL_USERNAME, email_config.EMAIL_PASSWORD)
|
||||||
|
self.data: List[Mails] = []
|
||||||
|
self.inc: int = 100
|
||||||
|
self.start: int = 0
|
||||||
|
self.end: int = self.inc
|
||||||
|
|
||||||
|
self._connect_inbox()
|
||||||
|
self.mail_count = self._fetch_count()
|
||||||
|
self._fetch_all()
|
||||||
|
|
||||||
|
|
||||||
|
def _connect_inbox(self):
|
||||||
|
"""INBOX'a bağlanır"""
|
||||||
|
status, _ = self.mail.select("INBOX")
|
||||||
|
if status != 'OK':
|
||||||
|
raise Exception("INBOX'a bağlanılamadı")
|
||||||
|
|
||||||
|
def _fetch_count(self):
|
||||||
|
status, uids = self.mail.uid('SORT', '(REVERSE DATE)', 'UTF-8', 'ALL', 'FROM', f'"{self.config.MAILBOX}"')
|
||||||
|
if status != 'OK':
|
||||||
|
raise Exception("Mail sayısı alınamadı")
|
||||||
|
return len(uids[0].split())
|
||||||
|
|
||||||
|
def _fetch_all(self):
|
||||||
|
"""Tüm mailleri çeker ve self.data'ya Mails objesi olarak ekler"""
|
||||||
|
status, uids = self.mail.uid('SORT', '(REVERSE DATE)', 'UTF-8', 'ALL', 'FROM', f'"{self.config.MAILBOX}"')
|
||||||
|
if status != 'OK':
|
||||||
|
raise Exception("Mail arama başarısız")
|
||||||
|
for uid in uids[0].split():
|
||||||
|
status, msg_data = self.mail.uid('fetch', uid, '(RFC822)')
|
||||||
|
if status == 'OK' and msg_data[0] is not None:
|
||||||
|
self.data.append(Mails(uid, msg_data[0][1]))
|
||||||
|
|
||||||
|
def mark_no_attachment(self, uid: bytes):
|
||||||
|
"""Mesajı arşive taşır"""
|
||||||
|
self.mail.uid('COPY', uid, self.NO_ATTACHMENT_FOLDER)
|
||||||
|
self.delete(uid)
|
||||||
|
|
||||||
|
def mark_completed(self, uid: bytes):
|
||||||
|
"""Mesajı arşive taşır"""
|
||||||
|
self.mail.uid('COPY', uid, self.COMPLETED_FOLDER)
|
||||||
|
# self.delete(uid)
|
||||||
|
|
||||||
|
def delete(self, uid: bytes):
|
||||||
|
"""Mesajı siler"""
|
||||||
|
self.mail.uid('STORE', uid, '+FLAGS', r'(\Deleted)')
|
||||||
|
|
||||||
|
def commit(self):
|
||||||
|
"""Bekleyen silme/taşıma işlemlerini uygular"""
|
||||||
|
self.mail.expunge()
|
||||||
|
|
||||||
|
def logout(self):
|
||||||
|
self.mail.logout()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def count(self):
|
||||||
|
return len(self.data)
|
||||||
|
|
||||||
|
service = EmailReaderIsbankService(email_config, config)
|
||||||
|
mails = service.data
|
||||||
|
count = 0
|
||||||
|
redis_prefix = "Bank:Services:Task"
|
||||||
|
|
||||||
|
my_service = "mail"
|
||||||
|
for mail in mails:
|
||||||
|
if not getattr(mail, 'id', None):
|
||||||
|
continue
|
||||||
|
mail_id = mail.id.decode('utf-8')
|
||||||
|
if mail.attachments:
|
||||||
|
not_seen = redis_client.sadd(f'{redis_prefix}:Seen', mail_id)
|
||||||
|
index_of_set_data_get = redis_client.get(f'{redis_prefix}:Index')
|
||||||
|
if not_seen:
|
||||||
|
mail_to_dict = mail.to_dict()
|
||||||
|
mail_without_attachments = mail_to_dict.copy()
|
||||||
|
mail_without_attachments.pop('attachments', None)
|
||||||
|
write_object = {
|
||||||
|
'id': mail_id,
|
||||||
|
'data': {
|
||||||
|
"mail": mail_without_attachments,
|
||||||
|
"parser": mail_to_dict.get('attachments', []),
|
||||||
|
"ibanFinder": None,
|
||||||
|
"commentFinder": None
|
||||||
|
},
|
||||||
|
'created_at': datetime.now().isoformat(),
|
||||||
|
'completed': True,
|
||||||
|
'status': 'red',
|
||||||
|
'service': my_service,
|
||||||
|
# 'bank': 'isbank',
|
||||||
|
'is_completed': False
|
||||||
|
}
|
||||||
|
redis_write_ = redis_client.rpush(f'{redis_prefix}:Data', dumps(write_object))
|
||||||
|
if redis_write_:
|
||||||
|
if index_of_set_data_get:
|
||||||
|
index_of_set_data_get = loads(index_of_set_data_get)
|
||||||
|
index_of_set_data_get[str(mail_id)] = redis_write_ - 1
|
||||||
|
else:
|
||||||
|
index_of_set_data_get = {str(mail_id): redis_write_ - 1}
|
||||||
|
index_of_set_data_set = redis_client.set(f'{redis_prefix}:Index', dumps(index_of_set_data_get))
|
||||||
|
count += 1
|
||||||
|
else:
|
||||||
|
redis_client.spop(f'{redis_prefix}:Seen', mail_id)
|
||||||
|
else:
|
||||||
|
get_index = redis_client.get(f'{redis_prefix}:Index')
|
||||||
|
if not get_index:
|
||||||
|
continue
|
||||||
|
get_index = loads(get_index)
|
||||||
|
if get_index.get(str(mail_id), None):
|
||||||
|
object_from_redis = redis_client.lindex(f'{redis_prefix}:Data', int(get_index[str(mail_id)]))
|
||||||
|
if object_from_redis:
|
||||||
|
object_from_redis = loads(object_from_redis)
|
||||||
|
is_completed = object_from_redis.get('is_completed', False)
|
||||||
|
id_ = object_from_redis.get('data', {}).get('id', None)
|
||||||
|
if not mail_id == id_:
|
||||||
|
raise Exception("Mail id not match with id from redis")
|
||||||
|
if is_completed:
|
||||||
|
service.mark_completed(mail_id)
|
||||||
|
else:
|
||||||
|
service.mark_no_attachment(mail_id)
|
||||||
|
|
||||||
|
service.commit()
|
||||||
|
service.logout()
|
||||||
|
|
||||||
|
print("Total Mails: ", f"{count}/{service.count}")
|
||||||
|
|
@ -0,0 +1,37 @@
|
||||||
|
import os
|
||||||
|
|
||||||
|
class Config:
|
||||||
|
|
||||||
|
MAILBOX: str = os.getenv("MAILBOX", "bilgilendirme@ileti.isbank.com.tr")
|
||||||
|
EMAIL_HOST: str = os.getenv("EMAIL_HOST", "10.10.2.34")
|
||||||
|
EMAIL_LOGIN_USER: str = os.getenv("EMAIL_READER_ADDRESS", "isbank@mehmetkaratay.com.tr")
|
||||||
|
EMAIL_LOGIN_PASSWORD: str = os.getenv("EMAIL_LOGIN_PASSWORD", "system")
|
||||||
|
AUTHORIZE_IBAN: str = os.getenv("AUTHORIZE_IBAN", "4245-0093333")
|
||||||
|
EMAIL_PORT: int = int(os.getenv("EMAIL_PORT", 993))
|
||||||
|
|
||||||
|
|
||||||
|
class EmailConfig:
|
||||||
|
|
||||||
|
EMAIL_HOST: str = Config.EMAIL_HOST
|
||||||
|
EMAIL_USERNAME: str = Config.EMAIL_LOGIN_USER
|
||||||
|
EMAIL_PASSWORD: str = Config.EMAIL_LOGIN_PASSWORD
|
||||||
|
EMAIL_PORT: int = Config.EMAIL_PORT
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def as_dict(cls):
|
||||||
|
return dict(
|
||||||
|
host=EmailConfig.EMAIL_HOST,
|
||||||
|
port=EmailConfig.EMAIL_PORT,
|
||||||
|
username=EmailConfig.EMAIL_USERNAME,
|
||||||
|
password=EmailConfig.EMAIL_PASSWORD
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# INFO_MAIL: str = os.getenv("INFO_MAIL", "mehmet.karatay@hotmail.com")
|
||||||
|
# EMAIL_SEND: bool = bool(os.getenv("EMAIL_SEND", False))
|
||||||
|
# EMAIL_SEND_PORT: int = int(os.getenv("EMAIL_SEND_PORT", 587))
|
||||||
|
# EMAIL_SLEEP: int = int(os.getenv("EMAIL_SLEEP", 60))
|
||||||
|
# SERVICE_TIMING: int = int(os.getenv("SERVICE_TIMING", 900))
|
||||||
|
# EMAIL_LOGIN_USER: str = os.getenv("EMAIL_LOGIN_USER", "karatay@mehmetkaratay.com.tr")
|
||||||
|
# MAIN_MAIL: str = os.getenv("MAIN_MAIL", "karatay.berkay@gmail.com")
|
||||||
|
# EMAIL_SEND: bool = Config.EMAIL_SEND
|
||||||
Loading…
Reference in New Issue