#!/usr/bin/env python
# coding: utf-8

# # Extract Section Heirarchy from WPM pages
# Data extracted from public HTML pages and written to `population_wpm_sections` table. Extraction details:
# - fetch revision IDs for all pages found in `enwiki-20190420-pages-articles-multistream.xml.bz2` dump file
# - fetch HTML using revision_id for each WP:M page
# - write page and section H2 data to `population_wpm_sections` for use in other notebooks

# In[1]:


get_ipython().run_line_magic('run', "-i 'data-defaults.py'")

import urllib.request
from bs4 import BeautifulSoup


# In[2]:


# method to fetch page HTML and map section_ids to section H2s
def extract_h2s(row):
    page_id = str(row['page_id'])
    rev_id = str(row['rev_id'])
    url = 'https://en.wikipedia.org/?oldid=' + rev_id
    sections = list()
    try:
        wpm_page = urllib.request.urlopen(url)
    except urllib.error.HTTPError:
        print('error fetching data for: ' + str(row))
        return
    soup = BeautifulSoup(wpm_page.read(), features="lxml")

    h2 = ""
    for span in soup.find_all(class_="mw-headline"):
        if('h2' == span.parent.name):
            h2 = span.get('id')
        sections.append(Row(page_id=page_id, section_h2=h2, section_id=span.get('id')))
    return sections


# In[3]:


# only need revision ids for 20190420
WIKIPEDIA_XML_DUMPS = ['enwiki-20190420-pages-articles-multistream.xml.bz2']

def page_rev(entity, date):
    return Row(page_id=entity.id, rev_id=entity.revision.id, dt=date)

page_revs_rdd = sc.emptyRDD()
for file in WIKIPEDIA_XML_DUMPS:
    wikipedia = sqlContext.read.format('com.databricks.spark.xml').options(rowTag='page').load(file)
    dump_date = re.search(r'.*(\d{8}).*',file).group(1)
    articles = wikipedia\
        .filter("ns = '0'")\
        .filter("redirect._title is null") \
        .filter("revision.text._VALUE is not null") \
        .filter("length(revision.text._VALUE) > 0")
    daily_page_revs = sqlContext.createDataFrame(articles.rdd.map(lambda entity: page_rev(entity, dump_date)))
    page_revs_rdd = page_revs_rdd.union(daily_page_revs.rdd)

page_revs_merged = sqlContext.createDataFrame(page_revs_rdd)
page_revs_merged.registerTempTable("page_revs_date")


# In[4]:


# extract WP:M page revisions for 20190420
query = """
SELECT DISTINCT page_id, rev_id
FROM 
    page_revs_date
WHERE page_id IN 
    (SELECT page_id FROM ryanmax.population_wpm_pages_with_extlinks)
    AND dt = '20190420'
"""

wpm_revs_pandas = spark.sql(query).toPandas()
wpm_revs_pandas.describe()


# In[5]:


all_sections = list()
i = 0
for index, row in wpm_revs_pandas.iterrows():
    for section in extract_h2s(row):
        all_sections.append(section)
    i += 1
    if (i%1000 == 0):
        print(str(i) + " pages fetched")

sections_df = spark.createDataFrame(all_sections)


# In[6]:


sections_df.toPandas().describe()


# In[7]:


# write section data to table for later use
sections_df.registerTempTable("temp_population_wpm_sections")
sqlContext.sql("DROP TABLE IF EXISTS ryanmax.population_wpm_sections")
sqlContext.sql("CREATE TABLE ryanmax.population_wpm_sections AS SELECT * FROM temp_population_wpm_sections")


# In[ ]: