432 lines
15 KiB
Python
432 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Created on Sat Apr 30 13:13:23 2022
|
|
|
|
@author: elena
|
|
"""
|
|
import os
|
|
import uuid
|
|
import hashlib
|
|
import datetime
|
|
from filelock import FileLock
|
|
|
|
from django.conf import settings
|
|
|
|
#from django.db.models import Q
|
|
from django.db import transaction
|
|
|
|
from django.utils import timezone
|
|
|
|
from .models import ReceipeImage, Article, Purchase, PurchaseArticle,ArticleMaps
|
|
|
|
from .loggers import LoggingMixin
|
|
from .parser import ReceipeParser
|
|
from .image_processing import crop_binarize, crop_binarize_scanner
|
|
from .file_handling import create_source_path_directory
|
|
|
|
MESSAGE_RECEIPE_ALREADY_EXISTS = "receipe_already_exists"
|
|
MESSAGE_FILE_NOT_FOUND = "file_not_found"
|
|
MESSAGE_PRE_CONSUME_SCRIPT_NOT_FOUND = "pre_consume_script_not_found"
|
|
MESSAGE_PRE_CONSUME_SCRIPT_ERROR = "pre_consume_script_error"
|
|
MESSAGE_POST_CONSUME_SCRIPT_NOT_FOUND = "post_consume_script_not_found"
|
|
MESSAGE_POST_CONSUME_SCRIPT_ERROR = "post_consume_script_error"
|
|
MESSAGE_NEW_FILE = "new_file"
|
|
MESSAGE_UNSUPPORTED_TYPE = "unsupported_type"
|
|
MESSAGE_PARSING_RECEIPE = "parsing_receipe"
|
|
MESSAGE_GENERATING_THUMBNAIL = "generating_thumbnail"
|
|
MESSAGE_PARSE_DATE = "parse_date"
|
|
MESSAGE_SAVE_RECEIPE = "save_receipe"
|
|
MESSAGE_FINISHED = "finished"
|
|
|
|
class ConsumerError(Exception):
|
|
pass
|
|
|
|
class Consumer(LoggingMixin):
|
|
logging_name = "receipeServer.consumer"
|
|
|
|
def _send_progress(self, current_progress, max_progress, status,
|
|
message=None, document_id=None):
|
|
payload = {
|
|
'filename': os.path.basename(self.filename) if self.filename else None, # NOQA: E501
|
|
'task_id': self.task_id,
|
|
'current_progress': current_progress,
|
|
'max_progress': max_progress,
|
|
'status': status,
|
|
'message': message,
|
|
'document_id': document_id
|
|
}
|
|
#async_to_sync(self.channel_layer.group_send)("status_updates",
|
|
# {'type': 'status_update',
|
|
# 'data': payload})
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.path = None
|
|
self.filename = None
|
|
self.task_id = None
|
|
|
|
#self.channel_layer = get_channel_layer()
|
|
|
|
def _fail(self, message, log_message=None, exc_info=None):
|
|
self._send_progress(100, 100, 'FAILED', message)
|
|
self.log("error", log_message or message, exc_info=exc_info)
|
|
raise ConsumerError(f"{self.filename}: {log_message or message}")
|
|
|
|
def pre_check_file_exists(self):
|
|
if not os.path.isfile(self.path):
|
|
self._fail(
|
|
MESSAGE_FILE_NOT_FOUND,
|
|
f"Cannot consume {self.path}: File not found."
|
|
)
|
|
|
|
def pre_check_directories(self):
|
|
os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
|
|
os.makedirs(settings.THUMBNAIL_DIR, exist_ok=True)
|
|
os.makedirs(settings.ORIGINALS_DIR, exist_ok=True)
|
|
os.makedirs(settings.ARCHIVE_DIR, exist_ok=True)
|
|
|
|
def pre_check_duplicate(self):
|
|
with open(self.path, "rb") as f:
|
|
checksum = hashlib.md5(f.read()).hexdigest()
|
|
#if ReceipeImage.objects.filter(Q(checksum=checksum) | Q(archive_checksum=checksum)).exists(): # NOQA: E501
|
|
if ReceipeImage.objects.filter(checksum=checksum).exists():
|
|
if settings.CONSUMER_DELETE_DUPLICATES:
|
|
os.unlink(self.path)
|
|
self._fail(
|
|
MESSAGE_RECEIPE_ALREADY_EXISTS,
|
|
f"Not consuming {self.filename}: It is a duplicate."
|
|
)
|
|
|
|
|
|
def try_consume_file(self,
|
|
path,
|
|
applyBinarize=True,
|
|
debug=False,
|
|
task_id=None,
|
|
scannerFile=False):
|
|
"""
|
|
Return the receipe object if it was successfully created.
|
|
"""
|
|
|
|
self.path = path
|
|
self.filename = os.path.basename(path)
|
|
self.task_id = task_id or str(uuid.uuid4())
|
|
|
|
self._send_progress(0, 100, 'STARTING', MESSAGE_NEW_FILE)
|
|
|
|
# this is for grouping logging entries for this particular file
|
|
# together.
|
|
|
|
self.renew_logging_group()
|
|
|
|
# Make sure that preconditions for consuming the file are met.
|
|
|
|
self.pre_check_file_exists()
|
|
self.pre_check_directories()
|
|
self.pre_check_duplicate()
|
|
|
|
self.log("info", f"Consuming {self.filename}")
|
|
|
|
# Determine the parser class.
|
|
|
|
|
|
# Notify all listeners that we're going to do some work.
|
|
|
|
#document_consumption_started.send(
|
|
# sender=self.__class__,
|
|
# filename=self.path,
|
|
# logging_group=self.logging_group
|
|
#)
|
|
|
|
|
|
def progress_callback(current_progress, max_progress):
|
|
# recalculate progress to be within 20 and 80
|
|
p = int((current_progress / max_progress) * 50 + 20)
|
|
self._send_progress(p, 100, "WORKING")
|
|
|
|
# This doesn't parse the document yet, but gives us a parser.
|
|
|
|
print("info Create parser")
|
|
|
|
document_parser = ReceipeParser(self.logging_group, debug = debug)
|
|
|
|
self.log("debug", f"Parser: {type(document_parser).__name__}")
|
|
|
|
# Parse the document. This may take some time.
|
|
|
|
articles = None
|
|
date = None
|
|
market = None
|
|
|
|
#Crop and binarize image
|
|
if applyBinarize:
|
|
self.path_bin = self.path[:-4]+'_binarized_cropped.jpg'
|
|
if scannerFile:
|
|
crop_binarize_scanner(self.path, self.path_bin)
|
|
else:
|
|
crop_binarize(self.path, self.path_bin)
|
|
self.filename_bin = os.path.basename(self.path_bin)
|
|
else:
|
|
self.path_bin = self.path
|
|
self.filename_bin = os.path.basename(self.path)
|
|
|
|
self._send_progress(20, 100, 'WORKING', MESSAGE_PARSING_RECEIPE)
|
|
self.log("debug", "Parsing {}...".format(self.filename))
|
|
print("Start parsing...")
|
|
if scannerFile:
|
|
document_parser.parse(self.path_bin, self.filename_bin, source='scanner')
|
|
else:
|
|
document_parser.parse(self.path_bin, self.filename_bin, source='cam')
|
|
print("... done")
|
|
|
|
self.log("debug", f"Generating thumbnail for {self.filename}...")
|
|
self._send_progress(70, 100, 'WORKING',
|
|
MESSAGE_GENERATING_THUMBNAIL)
|
|
|
|
articles = document_parser.get_articles()
|
|
|
|
date = document_parser.get_date()
|
|
|
|
market = document_parser.get_market()
|
|
|
|
total = document_parser.get_total()
|
|
|
|
if debug:
|
|
#print(articles)
|
|
print(date)
|
|
print(market)
|
|
print(total)
|
|
|
|
|
|
self._send_progress(90, 100, 'WORKING',
|
|
MESSAGE_PARSE_DATE)
|
|
|
|
#archive_path = document_parser.get_archive_path()
|
|
|
|
# Prepare the document classifier.
|
|
self._send_progress(95, 100, 'WORKING', MESSAGE_SAVE_RECEIPE)
|
|
# now that everything is done, we can start to store the document
|
|
# in the system. This will be a transaction and reasonably fast.
|
|
if not debug:
|
|
try:
|
|
with transaction.atomic():
|
|
|
|
# store the receipe.
|
|
receipeImage = self._store(
|
|
articles=articles,
|
|
date=date,
|
|
market=market,
|
|
total=total
|
|
)
|
|
|
|
# If we get here, it was successful. Proceed with post-consume
|
|
# hooks. If they fail, nothing will get changed.
|
|
|
|
#document_consumption_finished.send(
|
|
# sender=self.__class__,
|
|
# document=document,
|
|
# logging_group=self.logging_group,
|
|
# classifier=classifier
|
|
#)
|
|
|
|
# After everything is in the database, copy the files into
|
|
# place. If this fails, we'll also rollback the transaction.
|
|
with FileLock(settings.MEDIA_LOCK):
|
|
create_source_path_directory(receipeImage.source_path)
|
|
|
|
self._write(self.path, receipeImage.source_path)
|
|
self._write(self.path_bin, receipeImage.source_path_trashed)
|
|
|
|
# Delete the file only if it was successfully consumed
|
|
self.log("debug", "Deleting file {}".format(self.path))
|
|
os.unlink(self.path)
|
|
self.log("debug", "Deleting file {}".format(self.path_bin))
|
|
os.unlink(self.path_bin)
|
|
|
|
# https://github.com/jonaswinkler/paperless-ng/discussions/1037
|
|
shadow_file = os.path.join(
|
|
os.path.dirname(self.path),
|
|
"._" + os.path.basename(self.path))
|
|
|
|
if os.path.isfile(shadow_file):
|
|
self.log("debug", "Deleting file {}".format(shadow_file))
|
|
os.unlink(shadow_file)
|
|
|
|
shadow_file = os.path.join(
|
|
os.path.dirname(self.path_bin),
|
|
"._" + os.path.basename(self.path_bin))
|
|
|
|
if os.path.isfile(shadow_file):
|
|
self.log("debug", "Deleting file {}".format(shadow_file))
|
|
os.unlink(shadow_file)
|
|
|
|
|
|
|
|
except Exception as e:
|
|
self._fail(
|
|
str(e),
|
|
f"The following error occured while consuming "
|
|
f"{self.filename}: {e}",
|
|
exc_info=True
|
|
)
|
|
finally:
|
|
pass
|
|
#document_parser.cleanup()
|
|
|
|
#self.run_post_consume_script(document)
|
|
|
|
self.log(
|
|
"info",
|
|
"Receipe {} consumption finished".format(receipeImage)
|
|
)
|
|
|
|
self._send_progress(100, 100, 'SUCCESS', MESSAGE_FINISHED, receipeImage.id)
|
|
|
|
return receipeImage
|
|
else:
|
|
return None
|
|
|
|
def _write(self,source, target):
|
|
with open(source, "rb") as read_file:
|
|
with open(target, "wb") as write_file:
|
|
write_file.write(read_file.read())
|
|
|
|
def _store(self, articles, date, market, total):
|
|
|
|
stats = os.stat(self.path)
|
|
|
|
self.log("debug", "Saving record to database")
|
|
|
|
created = date or timezone.make_aware(
|
|
datetime.datetime.fromtimestamp(stats.st_mtime))
|
|
|
|
#Save market if it not allready exists
|
|
market.save()
|
|
|
|
|
|
dateName=date or datetime.datetime.now()
|
|
try:
|
|
self.filename = ('Receipe_'+
|
|
str(uuid.uuid4())+
|
|
'_'+
|
|
dateName.strftime('%d-%m-%Y-%H-%M-%S')+
|
|
'.jpg'
|
|
)
|
|
self.filename_bin=self.filename[:-4]+'thrased.jpg'
|
|
except:
|
|
print('Something is wrong with new filename')
|
|
|
|
#Save receipeImage to database
|
|
with open(self.path, "rb") as f, open(self.path_bin, "rb") as fbin:
|
|
receipeImage = ReceipeImage.objects.create(
|
|
filename=self.filename,
|
|
filename_trashed=self.filename_bin,
|
|
checksum=hashlib.md5(f.read()).hexdigest(),
|
|
thrashed_checksum=hashlib.md5(fbin.read()).hexdigest(),
|
|
created=created,
|
|
modified=created,
|
|
)
|
|
receipeImage.save()
|
|
|
|
|
|
|
|
#Create new purchase
|
|
purchase = Purchase.objects.create(
|
|
purchase_date=dateName,
|
|
total_price=total,
|
|
market=market,
|
|
receipeImage=receipeImage
|
|
)
|
|
|
|
purchase.save()
|
|
|
|
for element in articles[1]:
|
|
if len(element.name) >= 50:
|
|
element.name = element.name[0:49]
|
|
if len(element.nameString) >= 50:
|
|
element.nameString = element.nameString[0:49]
|
|
|
|
article = Article.objects.create(
|
|
name=element.name
|
|
)
|
|
|
|
article.save()
|
|
|
|
purchaseArticle = PurchaseArticle.objects.create(
|
|
purchase_id=purchase,
|
|
article_id=article,
|
|
quantity=element.quantity,
|
|
price=element.price,
|
|
inSale=False
|
|
)
|
|
|
|
purchaseArticle.save()
|
|
|
|
try:
|
|
articleMaps = ArticleMaps.objects.create(
|
|
article=article,
|
|
receipeString=element.nameString,
|
|
location_x=element.nameBBox.x,
|
|
location_y=element.nameBBox.y,
|
|
location_h=element.nameBBox.h,
|
|
location_w=element.nameBBox.w,
|
|
receipeImage=receipeImage
|
|
)
|
|
except AttributeError:
|
|
articleMaps = ArticleMaps.objects.create(
|
|
article=article,
|
|
receipeString=element.nameString,
|
|
location_x=0,
|
|
location_y=0,
|
|
location_h=0,
|
|
location_w=0,
|
|
receipeImage=receipeImage
|
|
)
|
|
|
|
articleMaps.save()
|
|
|
|
for element in articles[0]:
|
|
print(element)
|
|
print(element.name)
|
|
print(element.articleId.pk)
|
|
article = Article.objects.get(
|
|
pk=element.articleId.pk
|
|
)
|
|
|
|
purchaseArticle = PurchaseArticle.objects.create(
|
|
purchase_id=purchase,
|
|
article_id=article,
|
|
quantity=element.quantity,
|
|
price=element.price,
|
|
inSale=False
|
|
)
|
|
|
|
purchaseArticle.save()
|
|
|
|
try:
|
|
articleMaps = ArticleMaps.objects.create(
|
|
article=article,
|
|
receipeString=element.nameString,
|
|
location_x=element.nameBBox.x,
|
|
location_y=element.nameBBox.y,
|
|
location_h=element.nameBBox.h,
|
|
location_w=element.nameBBox.w,
|
|
receipeImage=receipeImage
|
|
)
|
|
except AttributeError:
|
|
articleMaps = ArticleMaps.objects.create(
|
|
article=article,
|
|
receipeString=element.nameString,
|
|
location_x=0,
|
|
location_y=0,
|
|
location_h=0,
|
|
location_w=0,
|
|
receipeImage=receipeImage
|
|
)
|
|
|
|
articleMaps.save()
|
|
|
|
return receipeImage
|
|
|
|
|