#!/usr/bin/env python # coding: utf-8 # # Find rows in a column # # Use Tesseract to separate columns into rows. # In[1]: import cv2 import pandas as pd import numpy as np try: from PIL import Image except ImportError: import Image import pytesseract from statistics import mean import math import statistics import re import os import tempfile from fuzzywuzzy import fuzz from tqdm.auto import tqdm from pathlib import Path # In[2]: # These OCR image preprocessing steps are based on https://stackoverflow.com/a/43493383 # I don't really understand why this particular combination of filters works, but it does seem to improve OCR results BINARY_THRESHOLD = 200 def process_image_for_ocr(file_path): # TODO : Implement using opencv temp_filename = set_image_dpi(file_path) im_new = remove_noise_and_smooth(temp_filename) return im_new def set_image_dpi(file_path): im = Image.open(file_path) #length_x, width_y = im.size #factor = max(1, int(IMAGE_SIZE / length_x)) #size = factor * length_x, factor * width_y # size = (1800, 1800) #im_resized = im.resize(size, Image.ANTIALIAS) temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') temp_filename = temp_file.name im.save(temp_filename, dpi=(300, 300)) return temp_filename def image_smoothening(img): ret1, th1 = cv2.threshold(img, BINARY_THRESHOLD, 255, cv2.THRESH_BINARY) ret2, th2 = cv2.threshold(th1, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) blur = cv2.GaussianBlur(th2, (1, 1), 0) ret3, th3 = cv2.threshold(blur, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) return th3 def remove_noise_and_smooth(file_name): img = cv2.imread(file_name, 0) filtered = cv2.adaptiveThreshold(img.astype(np.uint8), 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 41, 3) kernel = np.ones((1, 1), np.uint8) opening = cv2.morphologyEx(filtered, cv2.MORPH_OPEN, kernel) closing = cv2.morphologyEx(opening, cv2.MORPH_CLOSE, kernel) img = image_smoothening(img) or_image = cv2.bitwise_or(img, closing) return or_image # In[3]: def get_ocr(image_path): temp_img = process_image_for_ocr(image_path) df = pytesseract.image_to_data(temp_img, config='--psm 4 --oem 1 -l eng', output_type=pytesseract.Output.DATAFRAME) return df """ def find_col_width(df): for confidence in reversed(range(60, 110, 10)): for heading in ['buyers', 'closing', 'quotations']: for word in df.loc[(df['level'] == 5) & (df['top'] < 100)].sort_values(by='top', ascending=False).itertuples(): # print(word.text.lower()) if len(str(word.text)) > 5 and fuzz.partial_ratio(heading, word.text.lower()) >= confidence: # print(word) # print(fuzz.ratio('buyers', word.text.lower())) if word.left > 250: return word.left return None """ def find_col_width(df): candidates = [] for confidence in reversed(range(80, 110, 10)): for heading in ['buyers', 'closing', 'quotations']: for word in df.loc[(df['level'] == 5) & (df['left'] < 1500)].sort_values(by='top').itertuples(): # print(word.text.lower()) if len(str(word.text)) > 5 and fuzz.partial_ratio(heading, word.text.lower()) >= confidence: # print(word) # print(fuzz.ratio('buyers', word.text.lower())) if word.left > 625: candidates.append(word.left) try: lowest = sorted(candidates)[0] - 10 except IndexError: lowest = None return lowest def save_row(img, words, word_top, word_height, word_left, col_width, width, row_file): new_img = img.copy() top = words['top'].min() height = words['height'].max() left = 0 if word_height: # print(top, top+height, word_top+word_height) cv2.line(new_img, (word_left, word_top + word_height), (col_width, word_top + word_height), (255,0,0), 1) row = new_img[max(0, word_top - 20):word_top + word_height + 20, left: word_left + width] cv2.imwrite(str(row_file), row) def find_text(vol_dir, image_path, save_markup=False): col_data = [] vol_id = re.search(r'(AU NBAC N193-\d+)', str(image_path)).group(1) # vol_id = 'AU NBAC N193-001' # print(vol_id) image_name = os.path.basename(image_path) page_id, col_id = re.search(r'N193-\d+_(\d+)\.*-col-(\d+).jpg', image_name).groups() page_id = int(page_id) col_id = int(col_id) # print(page_id, col_id) # output_path = Path(vol_dir, 'rows') output_path = Path(f'/Volumes/bigdata/mydata/stockexchange/rows/{vol_id}') output_path.mkdir(parents=True, exist_ok=True) #rowcol_path = Path(output_path, 'cols') #rowcol_path.mkdir(parents=True, exist_ok=True) df = get_ocr(image_path) img = cv2.imread(str(image_path)) h, w = img.shape[:2] col_width = find_col_width(df) if not col_width: # col_width = 400 col_width = 1000 # print(col_width) if save_markup: new_img = np.zeros((h, w * 2, 3), np.uint8) new_img[:] = (255, 255, 255) new_img[0:h,0:w] = img cv2.line(new_img,(col_width, 0),(col_width, h),(0,0,255),1) ft = cv2.freetype.createFreeType2() ft.loadFontData(fontFileName='/Library/Fonts/Arial Unicode.ttf', id=0) row_id = 0 for para, lines in df.loc[(df['level'] == 5)].groupby(by=['block_num', 'par_num']): for line, words in lines.groupby(by=['line_num']): left = 10000 right = 0 top = 10000 height = 0 heights = [] tops = [] name_parts = [] for word in words.itertuples(): # Make sure it's not just nonsense cleaned_word = re.sub(r'[^&%\(\)\"\w\s\/\-]', '', str(word.text)) if cleaned_word and not cleaned_word.isspace() and (word.left + word.width) < (col_width + 20): name_parts.append(str(word.text)) if word.left < left: left = word.left if word.top < top: top = word.top if word.height > height: height = word.height if word.left + word.width > right: right = word.left + word.width tops.append(word.top) heights.append(word.height) height = int(round(statistics.mean(heights))) top = int(round(statistics.mean(tops))) # row_file = Path(output_path, f'{image_name[:-4].replace(".", "")}-row-{row_id}.jpg') # save_row(img, words, top, height, left, col_width, w, row_file) if name_parts: name_string = ' '.join(name_parts).replace('”', '"').replace('»', '"').replace('’', "'") # print(name_string) # Removes non-word characters & most punctuation cleaned_name = re.sub(r'[^&%\(\)\"\w\s\/\-\']', '', name_string) # OCR seems to turn dots into these words (and perhaps others) cleaned_name = re.sub(r'\s*\b[on,an,we,ee,oe,os,vs,\s]+\s*$', '', cleaned_name).strip() if cleaned_name and not cleaned_name.isspace(): # print(left, top, height, cleaned_name) col_data.append({'vol_id': vol_id, 'page_id': page_id, 'col_id': col_id, 'row_id': row_id, 'text': cleaned_name, 'left': left, 'top': top, 'height': height, 'right': right }) if save_markup: cv2.line(new_img,(0, top+height),(w * 2, top+height),(255,0,0),2) ft.putText(new_img, cleaned_name, (w + 20, top + height), fontHeight=40, color=(0, 0, 0), thickness=-1, line_type=cv2.LINE_AA, bottomLeftOrigin=True) row_file = Path(output_path, f'{image_name[:-4].replace(".", "")}-row-{row_id}.jpg') save_row(img, words, top, height, left, col_width, w, row_file) row_id += 1 if save_markup: markup_img = Path(rowcol_path, f'{image_name[:-4]}-rows.jpg') cv2.imwrite(str(markup_img), new_img) return col_data # ## Process directories # In[ ]: input_path = Path('/Volumes/bigdata/mydata/stockexchange/processed') start_vol = 103 for vol_dir in tqdm([d for d in input_path.glob('*') if d.is_dir()], desc='Directories'): # print(img_dir) output_path = Path(vol_dir, 'rows') rowcol_path = Path(output_path, 'cols') rowcol_path.mkdir(parents=True, exist_ok=True) vol_num = int(re.search(r'(\d+)$', vol_dir.name).group(1)) vol_data = [] columns_dir = Path(vol_dir, 'columns') if vol_num >= start_vol: for img_name in tqdm([i for i in columns_dir.glob('*.jpg')]): # print(img_name) vol_data += find_text(vol_dir, img_name, save_markup=True) df_text = pd.DataFrame(vol_data).sort_values(by=['vol_id', 'page_id', 'col_id', 'row_id']) df_text.to_csv(f'vol-{vol_num}-text.csv') # In[ ]: