Use Tesseract to separate columns into rows.
import cv2
import pandas as pd
import numpy as np
try:
from PIL import Image
except ImportError:
import Image
import pytesseract
from statistics import mean
import math
import statistics
import re
import os
import tempfile
from fuzzywuzzy import fuzz
from tqdm.auto import tqdm
from pathlib import Path
# These OCR image preprocessing steps are based on https://stackoverflow.com/a/43493383
# I don't really understand why this particular combination of filters works, but it does seem to improve OCR results
BINARY_THRESHOLD = 200
def process_image_for_ocr(file_path):
# TODO : Implement using opencv
temp_filename = set_image_dpi(file_path)
im_new = remove_noise_and_smooth(temp_filename)
return im_new
def set_image_dpi(file_path):
im = Image.open(file_path)
#length_x, width_y = im.size
#factor = max(1, int(IMAGE_SIZE / length_x))
#size = factor * length_x, factor * width_y
# size = (1800, 1800)
#im_resized = im.resize(size, Image.ANTIALIAS)
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.jpg')
temp_filename = temp_file.name
im.save(temp_filename, dpi=(300, 300))
return temp_filename
def image_smoothening(img):
ret1, th1 = cv2.threshold(img, BINARY_THRESHOLD, 255, cv2.THRESH_BINARY)
ret2, th2 = cv2.threshold(th1, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
blur = cv2.GaussianBlur(th2, (1, 1), 0)
ret3, th3 = cv2.threshold(blur, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
return th3
def remove_noise_and_smooth(file_name):
img = cv2.imread(file_name, 0)
filtered = cv2.adaptiveThreshold(img.astype(np.uint8), 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 41, 3)
kernel = np.ones((1, 1), np.uint8)
opening = cv2.morphologyEx(filtered, cv2.MORPH_OPEN, kernel)
closing = cv2.morphologyEx(opening, cv2.MORPH_CLOSE, kernel)
img = image_smoothening(img)
or_image = cv2.bitwise_or(img, closing)
return or_image
def get_ocr(image_path):
temp_img = process_image_for_ocr(image_path)
df = pytesseract.image_to_data(temp_img, config='--psm 4 --oem 1 -l eng', output_type=pytesseract.Output.DATAFRAME)
return df
"""
def find_col_width(df):
for confidence in reversed(range(60, 110, 10)):
for heading in ['buyers', 'closing', 'quotations']:
for word in df.loc[(df['level'] == 5) & (df['top'] < 100)].sort_values(by='top', ascending=False).itertuples():
# print(word.text.lower())
if len(str(word.text)) > 5 and fuzz.partial_ratio(heading, word.text.lower()) >= confidence:
# print(word)
# print(fuzz.ratio('buyers', word.text.lower()))
if word.left > 250:
return word.left
return None
"""
def find_col_width(df):
candidates = []
for confidence in reversed(range(80, 110, 10)):
for heading in ['buyers', 'closing', 'quotations']:
for word in df.loc[(df['level'] == 5) & (df['left'] < 1500)].sort_values(by='top').itertuples():
# print(word.text.lower())
if len(str(word.text)) > 5 and fuzz.partial_ratio(heading, word.text.lower()) >= confidence:
# print(word)
# print(fuzz.ratio('buyers', word.text.lower()))
if word.left > 625:
candidates.append(word.left)
try:
lowest = sorted(candidates)[0] - 10
except IndexError:
lowest = None
return lowest
def save_row(img, words, word_top, word_height, word_left, col_width, width, row_file):
new_img = img.copy()
top = words['top'].min()
height = words['height'].max()
left = 0
if word_height:
# print(top, top+height, word_top+word_height)
cv2.line(new_img, (word_left, word_top + word_height), (col_width, word_top + word_height), (255,0,0), 1)
row = new_img[max(0, word_top - 20):word_top + word_height + 20, left: word_left + width]
cv2.imwrite(str(row_file), row)
def find_text(vol_dir, image_path, save_markup=False):
col_data = []
vol_id = re.search(r'(AU NBAC N193-\d+)', str(image_path)).group(1)
# vol_id = 'AU NBAC N193-001'
# print(vol_id)
image_name = os.path.basename(image_path)
page_id, col_id = re.search(r'N193-\d+_(\d+)\.*-col-(\d+).jpg', image_name).groups()
page_id = int(page_id)
col_id = int(col_id)
# print(page_id, col_id)
# output_path = Path(vol_dir, 'rows')
output_path = Path(f'/Volumes/bigdata/mydata/stockexchange/rows/{vol_id}')
output_path.mkdir(parents=True, exist_ok=True)
#rowcol_path = Path(output_path, 'cols')
#rowcol_path.mkdir(parents=True, exist_ok=True)
df = get_ocr(image_path)
img = cv2.imread(str(image_path))
h, w = img.shape[:2]
col_width = find_col_width(df)
if not col_width:
# col_width = 400
col_width = 1000
# print(col_width)
if save_markup:
new_img = np.zeros((h, w * 2, 3), np.uint8)
new_img[:] = (255, 255, 255)
new_img[0:h,0:w] = img
cv2.line(new_img,(col_width, 0),(col_width, h),(0,0,255),1)
ft = cv2.freetype.createFreeType2()
ft.loadFontData(fontFileName='/Library/Fonts/Arial Unicode.ttf', id=0)
row_id = 0
for para, lines in df.loc[(df['level'] == 5)].groupby(by=['block_num', 'par_num']):
for line, words in lines.groupby(by=['line_num']):
left = 10000
right = 0
top = 10000
height = 0
heights = []
tops = []
name_parts = []
for word in words.itertuples():
# Make sure it's not just nonsense
cleaned_word = re.sub(r'[^&%\(\)\"\w\s\/\-]', '', str(word.text))
if cleaned_word and not cleaned_word.isspace() and (word.left + word.width) < (col_width + 20):
name_parts.append(str(word.text))
if word.left < left:
left = word.left
if word.top < top:
top = word.top
if word.height > height:
height = word.height
if word.left + word.width > right:
right = word.left + word.width
tops.append(word.top)
heights.append(word.height)
height = int(round(statistics.mean(heights)))
top = int(round(statistics.mean(tops)))
# row_file = Path(output_path, f'{image_name[:-4].replace(".", "")}-row-{row_id}.jpg')
# save_row(img, words, top, height, left, col_width, w, row_file)
if name_parts:
name_string = ' '.join(name_parts).replace('”', '"').replace('»', '"').replace('’', "'")
# print(name_string)
# Removes non-word characters & most punctuation
cleaned_name = re.sub(r'[^&%\(\)\"\w\s\/\-\']', '', name_string)
# OCR seems to turn dots into these words (and perhaps others)
cleaned_name = re.sub(r'\s*\b[on,an,we,ee,oe,os,vs,\s]+\s*$', '', cleaned_name).strip()
if cleaned_name and not cleaned_name.isspace():
# print(left, top, height, cleaned_name)
col_data.append({'vol_id': vol_id, 'page_id': page_id, 'col_id': col_id, 'row_id': row_id, 'text': cleaned_name, 'left': left, 'top': top, 'height': height, 'right': right })
if save_markup:
cv2.line(new_img,(0, top+height),(w * 2, top+height),(255,0,0),2)
ft.putText(new_img, cleaned_name, (w + 20, top + height), fontHeight=40, color=(0, 0, 0), thickness=-1, line_type=cv2.LINE_AA, bottomLeftOrigin=True)
row_file = Path(output_path, f'{image_name[:-4].replace(".", "")}-row-{row_id}.jpg')
save_row(img, words, top, height, left, col_width, w, row_file)
row_id += 1
if save_markup:
markup_img = Path(rowcol_path, f'{image_name[:-4]}-rows.jpg')
cv2.imwrite(str(markup_img), new_img)
return col_data
input_path = Path('/Volumes/bigdata/mydata/stockexchange/processed')
start_vol = 103
for vol_dir in tqdm([d for d in input_path.glob('*') if d.is_dir()], desc='Directories'):
# print(img_dir)
output_path = Path(vol_dir, 'rows')
rowcol_path = Path(output_path, 'cols')
rowcol_path.mkdir(parents=True, exist_ok=True)
vol_num = int(re.search(r'(\d+)$', vol_dir.name).group(1))
vol_data = []
columns_dir = Path(vol_dir, 'columns')
if vol_num >= start_vol:
for img_name in tqdm([i for i in columns_dir.glob('*.jpg')]):
# print(img_name)
vol_data += find_text(vol_dir, img_name, save_markup=True)
df_text = pd.DataFrame(vol_data).sort_values(by=['vol_id', 'page_id', 'col_id', 'row_id'])
df_text.to_csv(f'vol-{vol_num}-text.csv')