2023-11-09 18:47:11 +01:00

743 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Apr 30 22:47:15 2022
@author: elena
"""
#import logging
import cv2
import re
import os
from copy import deepcopy
import numpy as np
import pytesseract
from scipy import stats
from ordered_set import OrderedSet
from .loggers import LoggingMixin
from .image_processing import BBox
from .classes import noDBArticle
#from .models import ReceipeString, Market
import receipe.models as models
from .linearalignment import LinearAlignment
from django.conf import settings
from django.utils import timezone
from django.contrib.postgres.search import TrigramSimilarity
# This regular expression will try to find dates in the document at
# hand and will match the following formats:
# - XX.YY.ZZZZ with XX + YY being 1 or 2 and ZZZZ being 2 or 4 digits
# - XX/YY/ZZZZ with XX + YY being 1 or 2 and ZZZZ being 2 or 4 digits
# - XX-YY-ZZZZ with XX + YY being 1 or 2 and ZZZZ being 2 or 4 digits
# - ZZZZ.XX.YY with XX + YY being 1 or 2 and ZZZZ being 2 or 4 digits
# - ZZZZ/XX/YY with XX + YY being 1 or 2 and ZZZZ being 2 or 4 digits
# - ZZZZ-XX-YY with XX + YY being 1 or 2 and ZZZZ being 2 or 4 digits
# - XX. MONTH ZZZZ with XX being 1 or 2 and ZZZZ being 2 or 4 digits
# - MONTH ZZZZ, with ZZZZ being 4 digits
# - MONTH XX, ZZZZ with XX being 1 or 2 and ZZZZ being 4 digits
DATE_REGEX = re.compile(
r'(\b|(?!=([_-])))([0-9]{1,2})[\.\/-]([0-9]{1,2})[\.\/-]([0-9]{4}|[0-9]{2})(\b|(?=([_-])))|' # NOQA: E501
r'(\b|(?!=([_-])))([0-9]{4}|[0-9]{2})[\.\/-]([0-9]{1,2})[\.\/-]([0-9]{1,2})(\b|(?=([_-])))|' # NOQA: E501
r'(\b|(?!=([_-])))([0-9]{1,2}[\. ]+[^ ]{3,9} ([0-9]{4}|[0-9]{2}))(\b|(?=([_-])))|' # NOQA: E501
r'(\b|(?!=([_-])))([^\W\d_]{3,9} [0-9]{1,2}, ([0-9]{4}))(\b|(?=([_-])))|'
r'(\b|(?!=([_-])))([^\W\d_]{3,9} [0-9]{4})(\b|(?=([_-])))'
)
STREET_REGEX = re.compile(r'[A-Z][a-z]{1,} \d{1,3}')
ZIP_REGEX = re.compile(r'\d{5} .*') #Valid for Germany
PHONE_REGEX = re.compile(r'\d{4,}(\/|-| \/ | - )\d*')
FLOAT_REGEX = re.compile(r'(|-|\+)\d*(,|\.)\d*')
MULTIPLE_REGEX = re.compile(r'\d{1,}(x|X){1,}')
class ReceipeParser(LoggingMixin):
logging_name = "receipeServer.parsing"
def __init__(self, logging_group, debug=False, progress_callback=None):
super().__init__()
self.logging_group = logging_group
os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
self.debug = debug
self.archive_path = None
self.date = None
self.market = None
self.articles = None
self.total = None
self.progress_callback = progress_callback
self.knownArticles = OrderedSet([])
self.unknownArticles = set([])
def getLine(self,boxList: list) -> list:
"""Take a list of bounding boxes (with text) and combine them to (text) lines
Args:
boxList (list): The list containing all bounding boxes
Returns:
lines (list): a list of (text) lines (one bouning box per line)
"""
lines = []
idx = 0
while idx < len(boxList):
line = [boxList[idx]]
yref1 = boxList[idx].y
yref2 = yref1 + boxList[idx].h
idy = idx + 1
#Iterate over all boxes
while idy < len(boxList):
yval1 = boxList[idy].y
yval2 = boxList[idy].y + boxList[idy].h
#Calculate the length of overlapp between the two boxes
l = np.abs(np.max([yval1,yref1]) - np.min([yval2,yref2]))
#If distance combared to box 1 and box 2 is more than 50% and there is some overlapp include the box
if l/boxList[idx].h > 0.5 and l/boxList[idy].h > 0.5 and not (yval2 < yref1 or yval1 > yref2):
line.append(boxList[idy])
idx = idx + 1
idy = idy + 1
else:
idy = idy + 1
line.sort(key=lambda s: s.x)
lines.append(line)
idx = idx + 1
return lines
def grouper(self, iterable: list, interval: int = 2) -> list:
"""Group iterable into lines, within interval height. I do not understand what the code does
Args:
iterable (list): The list containing all (character) bounding boxes
interval (int): Measure to decide if boxes should be included in line
Returns:
group (generator): returns a generator for the lines
"""
prev = None
group = []
for item in iterable:
if not prev or abs(item[1] - prev[1]) <= interval:
group.append(item)
else:
yield group
group = [item]
prev = item
if group:
yield group
def doOCR(self,imgwidth: int, rois: list, lines: list, h: float, border: int = 10) -> list:
"""For the given rois (image parts) apply OCR and return the corresponding text
Args:
imgwidth (int): Width of the total image
rois (list): List containing the image fragments
lines (list): List with the bounding boxes
h (float): Mean lineheigth
border: Padding to apply around the bounding box
Returns:
texts (list): a list of the text of the different lines
"""
texts = []
#Tesseract
for idy in range(0,len(rois)):
lineroi = rois[idy]
linebox = lines[idy]
tmptext = []
for idx in range(len(lineroi)):
lineimg = np.ones((h+2*border,imgwidth), np.uint8)*255
lineimg[border:border+h,linebox[idx].x:linebox[idx].x+linebox[idx].w][:] = cv2.resize(lineroi[idx], (linebox[idx].w,h))
tmptext.append(pytesseract.image_to_string(lineimg,lang='deu').strip())
if tmptext != '':
texts.append(tmptext)
print(texts)
return texts
def parse_date(self,text: str):
"""
Returns the date of the receipe.
"""
def __parser(ds, date_order):
"""
Call dateparser.parse with a particular date ordering
"""
import dateparser
return dateparser.parse(
ds,
settings={
"DATE_ORDER": date_order,
"PREFER_DAY_OF_MONTH": "first",
"RETURN_AS_TIMEZONE_AWARE":
True
}
)
def __filter(date):
if date and date.year > 1900 and \
date <= timezone.now():
return date
return None
date = None
# Iterate through all regex matches in text and try to parse the date
for m in re.finditer(DATE_REGEX, text):
date_string = m.group(0)
try:
date = __parser(date_string, settings.DATE_ORDER)
except (TypeError, ValueError):
# Skip all matches that do not parse to a proper date
continue
date = __filter(date)
if date is not None:
break
return date
def parse_zip_city(self,text):
zipCode = None
city = None
for m in re.findall(ZIP_REGEX, text):
stringElements = m.split(' ')
zipCode = stringElements[0]
if len(stringElements) > 1:
city = stringElements[1]
return zipCode, city
def parse_street_streetNum(self,text):
street = None
streetNum = None
for m in re.findall(STREET_REGEX, text):
stringElements = m.split(' ')
street = stringElements[0]
if len(stringElements) > 1:
streetNum = stringElements[1]
return street, streetNum
def parse_phone(self,text):
for m in re.finditer(PHONE_REGEX, text):
return m.group(0)
def parse_float(self,text):
for m in re.finditer(FLOAT_REGEX, text):
value = m.group(0)
value = value.replace(',','.')
try:
value = float(value)
except ValueError:
value = 0
return value
return 0
def extractMarket(self,lineText,allText):
'''Extract market detail out of the quarter of lines (allow for some artefacts from logo)
'''
tmpMarket = models.Market()
for line in lineText:
if line[0] != '':
tmpMarket.name = line[0]
break
roi = ''
try:
for idx in range(0,int(len(lineText)/4)):
roi = roi + lineText[idx][0] +'\n'
except IndexError:
roi = ''
try:
tmpMarket.zipCode, tmpMarket.city = self.parse_zip_city(roi)
except TypeError:
try:
tmpMarket.zipCode, tmpMarket.city = self.parse_zip_city(allText)
except TypeError:
tmpMarket.zipCode = ''
tmpMarket.city = ''
try:
tmpMarket.street, tmpMarket.street_number = self.parse_street_streetNum(roi)
except TypeError:
try:
tmpMarket.street, tmpMarket.street_number = self.parse_street_streetNum(allText)
except TypeError:
tmpMarket.street = ''
tmpMarket.street_number = 0
try:
tmpMarket.phone = self.parse_phone(roi)
except TypeError:
try:
tmpMarket.phone = self.parse_phone(allText)
except TypeError:
tmpMarket.phone = ''
#TODO: Try some fuzzy search
#markets = Market.objects.filter( name=tmpMarket.name, street=tmpMarket.street)
# Trigram search for name and street
markets = models.Market.objects.annotate(similarity=TrigramSimilarity('name', tmpMarket.name)).filter(similarity__gt=0.3).order_by("-similarity")
markets = markets.annotate(similarity=TrigramSimilarity('street', tmpMarket.street)).filter(similarity__gt=0.3).order_by("-similarity")
if len(markets) != 0:
return markets[0]
else:
return tmpMarket
def progress(self, current_progress, max_progress):
if self.progress_callback:
self.progress_callback(current_progress, max_progress)
def fixString(self,oldString):
newString = oldString.replace('Ä','A').replace('Ö','O').replace('Ü','U').replace('ä','a').replace('ö','o').replace('ü','u').replace('#','').replace('©','o').replace('','').replace('*','').replace('',',')
return newString
def get_date(self):
return self.date
def get_market(self):
return self.market
def get_articles(self):
return [self.knownArticles, self.unknownArticles]
def get_total(self):
return self.total
def lineSegmentationSimple(self, img: np.array) -> list:
#%% Dilate in y direction to detect text lines
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (75,2))
dilate = cv2.dilate(cv2.bitwise_not(img), kernel, iterations=1)
# Find contours and filter using aspect ratio
# Remove non-text contours by filling in the contour
edge = cv2.Canny(dilate, 100, 250)
cnts = cv2.findContours(edge, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
cnts = cnts[0] if len(cnts) == 2 else cnts[1]
xall = []
yall = []
wall = []
hall = []
textBoxes = []
for c in cnts:
x,y,w,h = cv2.boundingRect(c)
xall.append(x)
yall.append(y)
wall.append(w)
hall.append(h)
zscoreH = stats.zscore(hall)
for idx in range(0,len(xall)):
#ar = w / float(h)
#if hall[idx] > lowerHbound:# and hall[idx] < upperHbound:
if zscoreH[idx] > 0.1 and zscoreH[idx] < 1.4 and hall[idx] < wall[idx]:
tmp = BBox(xall[idx],yall[idx],wall[idx],hall[idx])
textBoxes.append(tmp)
#cv2.drawContours(color, [cnts[idx]], -1, (0, 255, 0), 3, cv2.LINE_AA)
lines = self.getLine(textBoxes)
lines.reverse()
return lines
def lineSegmentationMSER(self, img: np.array) -> list:
mser = cv2.MSER_create()
regions, bboxes = mser.detectRegions(img)
hulls = [cv2.convexHull(p.reshape(-1, 1, 2)) for p in regions]
bboxes_list = list()
heights = list()
for hull in hulls:
x, y, w, h = cv2.boundingRect(hull)
bboxes_list.append([x, y, x + w, y + h]) # Create list of bounding boxes, with each bbox containing the left-top and right-bottom coordinates
heights.append(h)
heights = sorted(heights) # Sort heights
median_height = heights[int(len(heights) / 2)] / 3 # Find third of the median height
#print(median_height)
bboxes_list = sorted(bboxes_list, key=lambda k: k[1]) # Sort the bounding boxes based on y1 coordinate ( y of the left-top coordinate )
combined_bboxes = self.grouper(bboxes_list, median_height) # Group the bounding boxes
lines = []
for group in combined_bboxes:
x_min = min(group, key=lambda k: k[0])[0] # Find min of x1
x_max = max(group, key=lambda k: k[2])[2] # Find max of x2
y_min = min(group, key=lambda k: k[1])[1] # Find min of y1
y_max = max(group, key=lambda k: k[3])[3] # Find max of y2
if abs(y_min - y_max) < 3 * 3 * median_height and abs(y_min - y_max) > median_height and abs(x_min - x_max) > 100:
#cv2.rectangle(img, (x_min, y_min), (x_max, y_max), (0, 255, 0), 2)
lines.append(BBox(x_min,y_min,x_max-x_min,y_max-y_min))
linesWithSplit = []
for element in lines:
edges = cv2.Canny(img[element.y:element.y+element.h,element.x:element.x+element.w],100,200)
#imgplot = plt.imshow(edges)
#Calculate projection onto x axis
proj_v = np.sum(edges,0)
#plt.plot(proj_v)
#plt.show()
count = 0
prev = 0
indexend = 0
zeroSequence = []
gaplist = []
#Find all gaps (projected value of 0)
for i in range(0,len(proj_v)):
if proj_v[i] == 0:
count += 1
else:
if count > prev:
prev = count
indexend = i
if count > 70:
gaplist.append([prev,indexend-prev,indexend-1])
#indexend = 0
count = 0
prev = 0
indexend = 0
#print("The longest sequence of 0's is "+str(prev))
#print("index start at: "+ str(indexend-prev))
#print("index ends at: "+ str(indexend-1))
#Split the line at the borders of the gaps
if len (gaplist) < 1:
linesWithSplit.append([element])
elif len (gaplist) == 1:
tmpList = []
tmpList.append( BBox(element.x,element.y,gaplist[0][1],element.h) )
tmpList.append( BBox(element.x+gaplist[0][2],element.y,element.w-gaplist[0][2],element.h) )
linesWithSplit.append(tmpList)
else:
tmpList = []
for i in range(0,len(gaplist)):
if i == 0:
tmpList.append( BBox(element.x,element.y,gaplist[i][1],element.h) )
elif i < len(gaplist):
tmpList.append( BBox(element.x+gaplist[i-1][2],element.y,gaplist[i][1]-gaplist[i-1][2],element.h) )
tmpList.append( BBox(element.x+gaplist[-1][2],element.y,element.w-gaplist[-1][2],element.h) )
linesWithSplit.append(tmpList)
return linesWithSplit
def parse(self,path,inputfile,source='cam'):
self.knownArticles = set([])
self.unknownArticles = set([])
# Read image from which text needs to be extracted
print(path)
img = cv2.imread(path,cv2.IMREAD_GRAYSCALE)
if source == 'cam':
lines = self.lineSegmentationSimple(img)
elif source == 'scanner':
lines = self.lineSegmentationMSER(img)
rois = []
hsum = 0
numh = 0
for idx in range(0,len(lines)):
line = lines[idx]
tmproi = []
for element in line:
hsum = hsum + element.h
numh = numh + 1
#cv2.rectangle(color,(element.x,element.y),(element.x+element.w,element.y+element.h),(255,0,0),3)
tmproi.append(img[element.y:element.y+element.h,element.x:element.x+element.w])
rois.append(tmproi)
hmean = hsum/numh
h = int(hmean)
#newimg = np.ones(((h+border)*len(rois)+2*border,len(img[0,:])), np.uint8)*255
lineText = self.doOCR(len(img[0,:]), rois, lines, h)
allText = pytesseract.image_to_string(img,lang='deu')
#%% Extract market
self.market = self.extractMarket(lineText,allText)
print(self.market.name)
print(self.market.street)
print(self.market.street_number)
print(self.market.zip_code)
print(self.market.city)
print(self.market.phone)
#%% Extract date
date = None
for element in lineText:
date = self.parse_date(' '.join(element))
if date != None:
break
if date == None:
#Try again with OCR of the whole receipe
date = self.parse_date(allText)
self.date = date
print(self.date)
#%% Extract total
self.total = 0
for line in lineText:
text = ' '.join(line)
if 'EUR' in text or 'SUMME' in text or 'TOTAL' in text:
value = self.parse_float(text)
if value != 0:
self.total = value
break
# If we find no result, try again with the total text
if self.total == 0:
for line in allText.split('\n'):
text = line
if 'EUR' in text or 'SUMME' in text or 'TOTAL' in text:
value = self.parse_float(text)
if value != 0:
self.total = value
break
print(self.total)
#%% Extract articles
self.articles = []
newArticle = noDBArticle()
newArticle.quantity = 0
for idx in range(5,len(lineText)):
print(lineText[idx])
text = ' '.join(lineText[idx])
#The last import line of a receipe ends with the total or sum of prices, so afterwards we stop.
if ('EUR' in text or 'SUMME' in text or 'TOTAL' in text) and len(self.articles) > 1:
break
elif ('EUR' in text or 'SUMME' in text or 'TOTAL' in text) and len(self.articles) == 0:
continue
if newArticle.quantity <= 1:
newArticle.quantity = 1
#Runs only if regex found.
for m in re.finditer(MULTIPLE_REGEX, text.replace(' ','').replace('\t','')):
string = m.group(0)
for k in re.finditer(r'\d{1,}', string):
newArticle.quantity = int(k.group(0))
#TODO: Extract price per unit from this line
#If multiple articles are found, there is no info on the article, so we skip the rest
if newArticle.quantity > 1:
continue
if len(lineText[idx]) > 1:
newArticle.name = lineText[idx][0]
newArticle.nameString = lineText[idx][0]
newArticle.nameBBox = lines[idx][0]
#for element in lineText[idx]:
for idy in range(0,len(lineText[idx])):
try:
newArticle.price = self.parse_float(lineText[idx][idy]) / newArticle.quantity
except ZeroDivisionError:
print(newArticle.quantity)
newArticle.price = 0.01
if newArticle.price != 0:
newArticle.priceString = lineText[idx][idy]
newArticle.priceBBox = lines[idx][idy]
break
newArticle.name = self.fixString(newArticle.name)
copyOfNewArticle = deepcopy(newArticle)
self.articles.append(copyOfNewArticle)
newArticle.name=''
newArticle.quantity = 0
savedArticleMaps = models.ReceipeString.objects.all()
alignmendObject = LinearAlignment(self.logging_group)
for parsedArticle in self.articles:
matches = models.Article.objects.annotate(similarity=TrigramSimilarity('name', parsedArticle.name)).filter(similarity__gt=0.3).order_by("-similarity")
print(parsedArticle.name)
print(matches)
if len(matches) > 0:
# We try to find out, if we already have added the same article
tmpknownArticles = self.knownArticles.copy()
sizeSet = len(tmpknownArticles)
tmpknownArticles.add(matches[0])
# If the size remain the same, then the article is alread in our list, so we have to increase the quantity by 1 and then add it
if len(tmpknownArticles) == sizeSet:
# Elements of a set are not accessible directly, so we have to iterate over all elements
for element in self.knownArticles:
if element.name == article.name and element.id == article.id:
# For the comparision of two elements, the quantity does not count, so we remove it first
self.knownArticles.remove(matches[0])
article.quantity = article.quantity + 1
break
# Then we add same again, but with changed quantity.
self.knownArticles.add(matches[0])
# Otherwise, we just add it
else:
self.knownArticles.add(matches[0])
else:
tmpunknownArticles = self.unknownArticles.copy()
sizeSet = len(tmpunknownArticles)
tmpunknownArticles.add(parsedArticle)
# If the size remain the same, then the article is alread in our list, so we have to increase the quantity by 1 and then add it
if len(tmpunknownArticles) == sizeSet:
# Elements of a set are not accessible directly, so we have to iterate over all elements
for element in self.unknownArticles:
if element.name == parsedArticle.name:
parsedArticle.quantity = parsedArticle.quantity + 1
break
# For the comparision of to elements, the quantity does not count, so we remove it first
try:
self.unknownArticles.remove(parsedArticle)
except KeyError:
pass
# Then we add same again, but with changed quantity.
self.unknownArticles.add(parsedArticle)
# Otherwise, we just add it
else:
self.unknownArticles.add(parsedArticle)
'''
for parsedArticle in self.articles:
possibleMatches = []
possibleMatchesScore = np.array([])
for articleMaps in savedArticleMaps:
#print(parsedArticle.name)
#print(articleMaps.receipeString)
#TODO: Add lower case letter to alignment matrix
alignmendObject.setStrings(parsedArticle.name.upper(), articleMaps.receipeString.upper())
try:
stringScore = alignmendObject.scoring()
except KeyError:
stringScore = 0
if stringScore > 0.75:
print(parsedArticle.name.upper()+' '+articleMaps.receipeString.upper()+' Score: '+str(stringScore))
possibleMatches.append([parsedArticle,articleMaps])
possibleMatchesScore = np.append(possibleMatchesScore, stringScore)
if stringScore == 1:
break
if len(possibleMatches) > 0:
maxIdx = np.argmax(possibleMatchesScore)
# Take the article with best matching name
# First entry of list is the parsedArticle, then we overwrite name and id with the known one from the DB
article = possibleMatches[maxIdx][0]
article.name = possibleMatches[maxIdx][1].receipeString
article.id = possibleMatches[maxIdx][1].pk
article.articleId = possibleMatches[maxIdx][1].article
# We try to find out, if we already have added the same article
tmpknownArticles = self.knownArticles.copy()
sizeSet = len(tmpknownArticles)
tmpknownArticles.add(article)
# If the size remain the same, then the article is alread in our list, so we have to increase the quantity by 1 and then add it
if len(tmpknownArticles) == sizeSet:
# Elements of a set are not accessible directly, so we have to iterate over all elements
for element in self.knownArticles:
if element.name == article.name and element.id == article.id:
# For the comparision of two elements, the quantity does not count, so we remove it first
self.knownArticles.remove(article)
article.quantity = article.quantity + 1
break
# Then we add same again, but with changed quantity.
self.knownArticles.add(article)
# Otherwise, we just add it
else:
self.knownArticles.add(article)
else:
tmpunknownArticles = self.unknownArticles.copy()
sizeSet = len(tmpunknownArticles)
tmpunknownArticles.add(parsedArticle)
# If the size remain the same, then the article is alread in our list, so we have to increase the quantity by 1 and then add it
if len(tmpunknownArticles) == sizeSet:
# Elements of a set are not accessible directly, so we have to iterate over all elements
for element in self.unknownArticles:
if element.name == parsedArticle.name:
parsedArticle.quantity = parsedArticle.quantity + 1
break
# For the comparision of to elements, the quantity does not count, so we remove it first
try:
self.unknownArticles.remove(parsedArticle)
except KeyError:
pass
# Then we add same again, but with changed quantity.
self.unknownArticles.add(parsedArticle)
# Otherwise, we just add it
else:
self.unknownArticles.add(parsedArticle)
'''