#!/usr/bin/env python # coding: utf-8 # # Extract Section Heirarchy from WPM pages # Data extracted from public HTML pages and written to `population_wpm_sections` table. Extraction details: # - fetch revision IDs for all pages found in `enwiki-20190420-pages-articles-multistream.xml.bz2` dump file # - fetch HTML using revision_id for each WP:M page # - write page and section H2 data to `population_wpm_sections` for use in other notebooks # In[1]: get_ipython().run_line_magic('run', "-i 'data-defaults.py'") import urllib.request from bs4 import BeautifulSoup # In[2]: # method to fetch page HTML and map section_ids to section H2s def extract_h2s(row): page_id = str(row['page_id']) rev_id = str(row['rev_id']) url = 'https://en.wikipedia.org/?oldid=' + rev_id sections = list() try: wpm_page = urllib.request.urlopen(url) except urllib.error.HTTPError: print('error fetching data for: ' + str(row)) return soup = BeautifulSoup(wpm_page.read(), features="lxml") h2 = "" for span in soup.find_all(class_="mw-headline"): if('h2' == span.parent.name): h2 = span.get('id') sections.append(Row(page_id=page_id, section_h2=h2, section_id=span.get('id'))) return sections # In[3]: # only need revision ids for 20190420 WIKIPEDIA_XML_DUMPS = ['enwiki-20190420-pages-articles-multistream.xml.bz2'] def page_rev(entity, date): return Row(page_id=entity.id, rev_id=entity.revision.id, dt=date) page_revs_rdd = sc.emptyRDD() for file in WIKIPEDIA_XML_DUMPS: wikipedia = sqlContext.read.format('com.databricks.spark.xml').options(rowTag='page').load(file) dump_date = re.search(r'.*(\d{8}).*',file).group(1) articles = wikipedia\ .filter("ns = '0'")\ .filter("redirect._title is null") \ .filter("revision.text._VALUE is not null") \ .filter("length(revision.text._VALUE) > 0") daily_page_revs = sqlContext.createDataFrame(articles.rdd.map(lambda entity: page_rev(entity, dump_date))) page_revs_rdd = page_revs_rdd.union(daily_page_revs.rdd) page_revs_merged = sqlContext.createDataFrame(page_revs_rdd) page_revs_merged.registerTempTable("page_revs_date") # In[4]: # extract WP:M page revisions for 20190420 query = """ SELECT DISTINCT page_id, rev_id FROM page_revs_date WHERE page_id IN (SELECT page_id FROM ryanmax.population_wpm_pages_with_extlinks) AND dt = '20190420' """ wpm_revs_pandas = spark.sql(query).toPandas() wpm_revs_pandas.describe() # In[5]: all_sections = list() i = 0 for index, row in wpm_revs_pandas.iterrows(): for section in extract_h2s(row): all_sections.append(section) i += 1 if (i%1000 == 0): print(str(i) + " pages fetched") sections_df = spark.createDataFrame(all_sections) # In[6]: sections_df.toPandas().describe() # In[7]: # write section data to table for later use sections_df.registerTempTable("temp_population_wpm_sections") sqlContext.sql("DROP TABLE IF EXISTS ryanmax.population_wpm_sections") sqlContext.sql("CREATE TABLE ryanmax.population_wpm_sections AS SELECT * FROM temp_population_wpm_sections") # In[ ]: