This notebook is part of the How to export your Pocket data and migrate to Omnivore tutorial
First, let upload the ril_export.html
file generated in https://getpocket.com/export
The html has the following extructure:
<h1>
Unread<ul>
with list items of <a>
. The href is the link to the article, and the anchor text is the title. It also has a tags
and time_added
attributes.<h1>
Read<ul>
like the one aboveWe will transform this into a dict of:
from ipywidgets import FileUpload
from IPython.display import display
from datetime import datetime
from pathlib import Path
import logging
upload = FileUpload(accept='.html', multiple=False)
def save_file():
for v in upload.value:
content = v['content']
with open(v['name'], 'wb') as f:
f.write(bytes(content))
upload.observe(save_file, names='value')
display(upload)
assert Path("ril_export.html").exists(), "Upload the file before continue running"
from bs4 import BeautifulSoup
import os
OMNIVORE_API_URL = "https://api-prod.omnivore.app/api/graphql"
# The API key will have the following format "00000000-0000-0000-0000-000000000000"
OMNIVORE_API_KEY = os.environ.get('OMNIVORE_API_KEY')
SCHEMA_URL = "https://raw.githubusercontent.com/omnivore-app/omnivore/c9fcbe72ddc6f40dd06e7073b8ffe3c1e71bd650/packages/api/src/generated/schema.graphql"
REQUESTS_SLEEP_TIME = 60 # Number of seconds
PARALLEL_API_CALL_SIZE = 4
if not OMNIVORE_API_KEY:
OMNIVORE_API_KEY=input('Enter your omnivore API key (should have a format similar to 00000000-0000-0000-0000-000000000000)')
with open('ril_export.html', 'r') as f:
soup = BeautifulSoup(f, 'html.parser')
soup.title
Extract the articles and tags from the HTML doc.
def process_list(h1):
ul = h1.find_next_sibling('ul')
print(len(ul), h1.text, 'articles')
read = h1.text != 'Unread'
items = []
for a in ul.findAll('a', href=True):
items.append({
'read': read,
'time_added': datetime.fromtimestamp(int(a['time_added'])),
'href': a['href'],
'tags': a['tags'].split(',') if a['tags'] else [],
'title': a.text,
})
return items
articles = [item for sublist in [process_list(h1) for h1 in soup.findAll('h1')] for item in sublist]
labels = set([item for sublist in [article['tags'] for article in articles] for item in sublist])
We want to be able to track our process, as the API for Omnivore has rate limiting, and takes a while to upload the files. For this we will use a SQL Database.
We will store the Articles in the format:
and the tags with:
from enum import Enum
import peewee
from peewee import SqliteDatabase, Model
from playhouse.sqlite_ext import JSONField
from playhouse.reflection import print_table_sql
class StrEnum(str, Enum):
pass
db = peewee.SqliteDatabase('omnivore.db')
class BaseModel(peewee.Model):
class Meta:
database = db
class ArticleStatus(StrEnum):
PROCESSING = "PROCESSING"
SUCCEEDED = "SUCCEEDED"
RETRY = "_RETRY"
DONE = "_DONE"
class Article(BaseModel):
id = peewee.TextField(null=True)
read = peewee.BooleanField()
time_added = peewee.DateTimeField()
tags = JSONField(default=[])
href = peewee.TextField(primary_key=True)
status = peewee.TextField(null=True, choices=ArticleStatus)
slug = peewee.TextField(null=True)
class Tag(BaseModel):
id = peewee.TextField(null=True)
name = peewee.TextField(primary_key=True)
print_table_sql(Article)
print_table_sql(Tag)
# Create the Article Table
db.connect()
db.create_tables([Article, Tag])
for label in labels:
Tag.get_or_create(name=label)
# insert_tag_sql = f"""INSERT OR IGNORE into tags (name) values (?)"""
# cursor.executemany(insert_tag_sql, [(label,) for label in labels])
for article in articles:
Article.get_or_create(
read = article['read'],
time_added = article['time_added'],
href = article['href'],
tags = article['tags'],
)
import requests
with requests.get(SCHEMA_URL) as r:
r.raise_for_status()
schema = r.text
assert schema is not None
print(schema[:100])
from gql import gql, Client
from gql.transport.requests import RequestsHTTPTransport
def create_client():
transport = RequestsHTTPTransport(
url=OMNIVORE_API_URL,
headers = {
'authorization': OMNIVORE_API_KEY,
}
)
return Client(transport=transport, schema=schema, fetch_schema_from_transport=False, execute_timeout=None)
# Doing a "test query" to check if everything is correct
with create_client() as session:
r = session.execute(gql("""
query Viewer {
me {
id
name
profile {
username
}
}
}
"""))
result = r
USERNAME = result['me']['profile']['username']
print(f"Hello {result['me']['name']} ({USERNAME})!")
def getExistingTags():
with create_client() as session:
r = session.execute(gql("""
query Labels {
labels {
...on LabelsSuccess {
labels { name, id }
}
}
}
"""))
result = r
return result['labels']['labels']
#Then remove all the tags from the ones we created before
def saveTags(tagName):
with create_client() as client:
mutation = f"""
mutation {{
createLabel(input: {{color: "#F00", name: "{tagName}" }}) {{
... on CreateLabelSuccess {{
label {{
id
name
color
description
createdAt
}}
}}
... on CreateLabelError {{
errorCodes
}}
}}
}}
"""
r = client.execute(gql(mutation), {'name': str(tagName)})
print(r)
return r['createLabel']['label']['id']
server_tags = getExistingTags()
unsaved_tags = list(Tag.select().where(Tag.id.is_null()))
# tagIds = {f"{dictionary['name']}": dictionary["id"] for dictionary in presaved_tags + server_tags}
name2id = { t['name']:t['id'] for t in server_tags }
for tag in unsaved_tags:
if tag.name not in name2id:
try:
id = saveTags(tag.name)
name2id[tag] = id
except Exception as e:
print("An error occurred:", e)
tags = [ Tag(name=k, id=v) for k,v in name2id.items() ]
# query = "UPDATE tags SET id = ? where name = ?"
# cursor.executemany(query, [(value, key) for key, value in tagIds.items()])
Tag.bulk_update(tags, fields=[Tag.id])
all_tags = list(Tag.select())
all_tags
import backoff
createArticle = gql("""
mutation CreateArticleSavingRequest($url: String!) {
createArticleSavingRequest(input: {url: $url}) {
... on CreateArticleSavingRequestSuccess {
articleSavingRequest {
id
status
slug
createdAt
updatedAt
url
errorCode
}
}
... on CreateArticleSavingRequestError {
errorCodes
}
}
}
""")
setLabels = gql("""
mutation SetLabel($articleId: ID!, $labelIds: [ID!]!) {
setLabels(input: {pageId: $articleId, labelIds: $labelIds}) {
...on SetLabelsSuccess {
labels {
id
}
}
}
}
""")
updatePageSavedDate = gql("""
mutation UpdatePageDate($id: ID!, $date: Date!) {
updatePage(input: {pageId: $id, savedAt: $date, publishedAt: $date}) {
... on UpdatePageSuccess {
updatedPage {
id
savedAt
publishedAt
title
}
}
...on UpdatePageError {
errorCodes
}
}
}
""")
archivePage = gql("""
mutation ArchivePage($id: ID!) {
setLinkArchived (input: {linkId: $id, archived: true}) {
... on ArchiveLinkSuccess {
linkId
message
}
... on ArchiveLinkError {
message
errorCodes
}
}
}
""")
createTag = gql("""
mutation CreateLabel($nam: String!, $col: String, $desc: String) {
createLabel(input: {name: $nam, color: $col, description: $desc}) {
... on CreateLabelSuccess {
label {
id
name
color
description
createdAt
}
}
... on CreateLabelError {
errorCodes
}
}
}
""")
getArticleSavingRequest = gql("""
query ArticleSavingRequest($id: ID!) {
articleSavingRequest(id: $id) {
... on ArticleSavingRequestSuccess {
articleSavingRequest {
id
status
slug
createdAt
updatedAt
url
errorCode
}
}
... on ArticleSavingRequestError {
errorCodes
}
}
}
""")
getArticle = gql("""
query GetArticle($slug: String!, $username: String!) {
article(slug: $slug, username: $username) {
...on ArticleSuccess {
article {
id
title
slug
isArchived
savedAt
}
}
...on ArticleError {
errorCodes
}
}
}
""")
@backoff.on_predicate(
backoff.runtime,
predicate=lambda r: isinstance(r, RequestsHTTPTransport),
value=lambda r: int(r.response_headers["RateLimit-Reset"]) + 1,
jitter=None,
)
def archiveArticle(articleId):
with create_client() as client:
try:
res = client.execute(archivePage, { 'id': articleId })
return res
except Exception as e:
if (hasattr(e, 'code') and e.code == 429):
return session.transport
raise
@backoff.on_predicate(
backoff.runtime,
predicate=lambda r: isinstance(r, RequestsHTTPTransport),
value=lambda r: int(r.response_headers["RateLimit-Reset"]) + 1,
jitter=None,
)
def saveLabels(articleId, labels):
with create_client() as client:
try:
return client.execute(setLabels, {'articleId': articleId, 'labelIds': labels})
except Exception as e:
if (hasattr(e, 'code') and e.code == 429):
return session.transport
raise
@backoff.on_predicate(
backoff.runtime,
predicate=lambda r: isinstance(r, RequestsHTTPTransport),
value=lambda r: int(r.response_headers["RateLimit-Reset"]) + 1,
jitter=None,
)
def saveArticle(article):
with create_client() as client:
try:
logging.info(f"Saving article {article.href}")
url = article.href
tags = article.tags
# First createArticleSavingRequest
r = client.execute(createArticle, variable_values={'url': url})
article.id = r['createArticleSavingRequest']['articleSavingRequest']['id']
article.status = r['createArticleSavingRequest']['articleSavingRequest']['status']
if len(tags) != 0:
saveLabels(article.id, tags)
# Return the article with the id of the saved document
return article
except Exception as e:
if (hasattr(e, 'code') and e.code == 429):
return session.transport
# I don't know why this happens and I will figure it out later.
raise
@backoff.on_predicate(
backoff.runtime,
predicate=lambda r: isinstance(r, RequestsHTTPTransport),
value=lambda r: int(r.response_headers["RateLimit-Reset"]) + 1,
jitter=None,
)
def updateArticleTimeAfterProcessing(articleId, date: datetime = None):
with create_client() as client:
try:
if date is not None:
# Wait a bit, it seems there's a race condition.
res = client.execute(updatePageSavedDate, {
'id': articleId,
'date': date.isoformat(),
})
return res
except Exception as e:
if (hasattr(e, 'code') and e.code == 429):
return session.transport
# I don't know why this happens and I will figure it out later.
raise
@backoff.on_predicate(
backoff.runtime,
predicate=lambda r: isinstance(r, RequestsHTTPTransport),
value=lambda r: int(r.response_headers["RateLimit-Reset"]) + 1,
jitter=None,
)
def articleSavingRequest(articleId):
with create_client() as client:
try:
# Wait a bit, it seems there's a race condition.
res = client.execute(getArticleSavingRequest, {
'id': articleId,
})
return res['articleSavingRequest']['articleSavingRequest']
except Exception as e:
if (hasattr(e, 'code') and e.code == 429):
return session.transport
# I don't know why this happens and I will figure it out later.
raise
@backoff.on_predicate(
backoff.runtime,
predicate=lambda r: isinstance(r, RequestsHTTPTransport),
value=lambda r: int(r.response_headers["RateLimit-Reset"]) + 1,
jitter=None,
)
def getArticleInfo(slug):
with create_client() as client:
try:
# Wait a bit, it seems there's a race condition.
res = client.execute(getArticle, {
'slug': slug,
'username': USERNAME,
})
return res['article']['article']
except Exception as e:
if (hasattr(e, 'code') and e.code == 429):
return session.transport
# I don't know why this happens and I will figure it out later.
raise
Thread Pool executors fall a bit behind on our usage, so I will create a custom one
It has a number of worker
threads that just spin on a queue of functions executing the function.
import datetime as dt
import pytz
def checkTask(article):
""" Runs a check agains Omnivore, if the article is processed updates its date,
Returns the new article object
"""
asr = articleSavingRequest(article.id)
article.status = asr['status']
article.slug = asr['slug']
if article.status != 'SUCCEEDED':
print(f"Retrying article", asr)
return article
updateArticleTimeAfterProcessing(article.id, article.time_added)
if article.read:
archiveArticle(article.id)
omnArt = getArticleInfo(asr['slug'])
dtexpected = article.time_added.replace(tzinfo=pytz.UTC)
dtgot = datetime.fromisoformat(omnArt['savedAt'])
c1 = article.read == omnArt['isArchived']
c2 = abs((dtexpected - dtgot).total_seconds()) < 3600
if not c1 or not c2:
logstr = f"Retrying article {omnArt['slug']}, "
if not c1:
logstr += f"Incorrect archive status. expected {article.read}, got {omnArt['isArchived']}. "
if not c2:
logstr += f"datetime not changed. expected {dtexpected}, got {dtgot}"
logging.info(logstr)
article.status = '_RETRY'
return article
return article
Article.select()
from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing.pool import ThreadPool
from tqdm.notebook import tqdm
from threading import Lock
TIMEOUT = 120 # seconds
unsaved_articles = list(Article.select().where(Article.id.is_null()))
submitted_articles = list(Article.select().where(Article.id.is_null(False) & (Article.status != ArticleStatus.DONE)))
done_articles = list(Article.select().where(Article.status == ArticleStatus.DONE))
all_articles = unsaved_articles + submitted_articles + done_articles
assert len(all_articles) == len(Article.select())
tqdm_custom = lambda desc: tqdm(desc=desc, total=len(all_articles))
with (ThreadPoolExecutor(PARALLEL_API_CALL_SIZE) as executor,
tqdm_custom('Submit') as pbar1,
tqdm_custom('Done') as pbar2):
try:
unchecked_articles = {}
# TODO: Load from db and repopulate unsaved and unchecked articles
fmap1 = { executor.submit(saveArticle, article) : article for article in unsaved_articles }
print(f"Skipping submit of {len(submitted_articles)} articles")
pbar1.update(len(submitted_articles))
print(f"Skipping update of {len(done_articles)} previously done articles")
pbar1.update(len(done_articles))
pbar2.update(len(done_articles))
for f in as_completed(fmap1, timeout=TIMEOUT):
try:
newarticle = f.result()
pbar1.update()
unchecked_articles[executor.submit(checkTask, newarticle)] = newarticle
newarticle.save()
# Save state to db if needed
except Exception as e:
logging.error(repr(e))
for a in submitted_articles:
unchecked_articles[executor.submit(checkTask, a)] = a
pbar1.refresh()
pbar2.refresh()
while unchecked_articles:
try:
for f in as_completed(unchecked_articles, timeout=TIMEOUT*2):
newarticle = f.result()
if newarticle.status == 'SUCCEEDED': # Task completed
newarticle.status = ArticleStatus.DONE
newarticle.save()
pbar2.update()
elif newarticle.status == 'PROCESSING' or newarticle.status == '_RETRY':
unchecked_articles[executor.submit(checkTask, newarticle)] = newarticle
else:
raise ValueError(f"Unknown article status: {newarticle.status}")
unchecked_articles.pop(f)
except exception as e:
logging.error(repr(e))
except KeyboardInterrupt:
executor.shutdown(cancel_futures=True)
raise