#!/usr/bin/env python # coding: utf-8 # # Conda-Forge travis queue status # # Fetch and show a summary of conda-forge's travis queue # In[1]: import os import yaml with open(os.path.expanduser('~/.travis/config.yml')) as f: cfg = yaml.safe_load(f) token = cfg['endpoints']['https://api.travis-ci.org/']['access_token'] # In[2]: import os import asyncio from asyncio import get_event_loop, Semaphore import aiohttp from collections import defaultdict loop = get_event_loop() all_jobs = defaultdict(lambda : []) all_builds = defaultdict(lambda : []) headers = { 'Travis-API-Version': '3', 'Accept': 'application/json', 'Authorization': f'token {token}', } TRAVIS = "https://api.travis-ci.org" # limit concurrent requests to travis API sem = Semaphore(20) async def list_repos(url): futures = [] async with aiohttp.ClientSession() as session: async with sem, session.get(url, headers=headers) as response: data = await response.json() for repo in data['repositories']: if repo['active']: futures.append(asyncio.ensure_future(list_builds(repo, sem))) finished = [ ] so_far = data['@pagination']['offset'] + data['@pagination']['limit'] total = data['@pagination']['count'] if data['@pagination']['is_last']: so_far = total print(f"{so_far}/{total} repos") if not data['@pagination']['is_last']: next_url = f"{TRAVIS}{data['@pagination']['next']['@href']}" next_f = asyncio.ensure_future(list_repos(next_url)) else: next_f = None await asyncio.gather(*futures) print(f"{len(all_builds)} builds, {len(all_jobs)} jobs") if next_f: await next_f async def list_builds(repo, sem): url_repo = f'{TRAVIS}/repo/{repo["id"]}/builds?state=created' if repo['name'] in all_jobs: # short-circuit re-runs return async with sem, aiohttp.ClientSession() as session: data_build = await session.get(url_repo, headers=headers) builds = await data_build.json() builds = builds['builds'] repo_name = repo['name'] futures = [] for build in builds: all_builds[repo_name].append(build) for job in build['jobs']: futures.append(asyncio.ensure_future(get_job(job, sem))) await asyncio.gather(*futures) async def get_job(job_info, sem): url = f"{TRAVIS}{job_info['@href']}" async with sem, aiohttp.ClientSession() as session: r = await session.get(url, headers=headers) job = await r.json() if job['state'] == 'canceled': # omit canceled jobs return all_jobs[job['repository']['name']].append(job) def get_all_builds(owner): return list_repos(f"{TRAVIS}/owner/{owner}/repos") # In[3]: get_ipython().run_cell_magic('time', '', "loop.run_until_complete(get_all_builds('conda-forge'))\n") # Get all builds, sorted by `created_at` # In[4]: from itertools import chain from operator import itemgetter def get_date(job): """get date for sorting Use updated_at for 'created' jobs, which includes resubmit info. This won't be correct once the job has started, so fallback on 'created' date for those. """ if job['state'] == 'created': return job['updated_at'] else: return job['created_at'] ordered_jobs = sorted(chain(*all_jobs.values()), key=get_date) # Show a view of the current queue # In[5]: from dateutil.parser import parse as parse_date from datetime import datetime, timedelta, timezone now = datetime.now().astimezone(timezone.utc) all_authors = set(build['created_by']['login'] for build in chain(*all_builds.values())) lines = [ f"## Conda-Forge travis queue status as of {now.strftime('%Y-%m-%d %H:%M UTC')}", "", f"{len(all_builds)} builds and {len(all_jobs)} jobs", "", f"Total unique authors: {len(all_authors)}", "", ] i = 1 for i, job in enumerate(ordered_jobs): repo = job['repository']['slug'] url = f"https://travis-ci.org/{repo}/jobs/{job['id']}" date = parse_date(get_date(job)) ago = now - date if ago < timedelta(hours=24): ago_s = f"{ago.total_seconds() / 3600:.1f} hours" else: ago_s = f"{ago.days} days" s = f"{i}. {ago_s} ago [{repo}#{job['number']}]({url})" if job['state'] != 'created': s+= f" {job['state']}" lines.append(s) from IPython.display import Markdown Markdown('\n'.join(lines))