%load_ext autoreload
%autoreload 2
import sys
sys.path.append("..")
from optimus import Optimus
C:\Users\argenisleon\Anaconda3\lib\site-packages\dask\config.py:161: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details. data = yaml.load(f.read()) or {} C:\Users\argenisleon\Anaconda3\lib\site-packages\statsmodels\compat\pandas.py:49: FutureWarning: The Panel class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version data_klasses = (pandas.Series, pandas.DataFrame, pandas.Panel)
op = Optimus("dask")
op.client
Client
|
Cluster
|
# !pip install prefect
import datetime
import os
import prefect
from prefect import task
from prefect.engine.signals import SKIP
from prefect.tasks.shell import ShellTask
file = "data/crime.csv"
df = op.load.csv(file, sep=",", error_bad_lines=False, header=True, null_value="null", infer_schema='true', charset="latin1").persist()
df = df.cols.unnest("OCCURRED_ON_DATE", separator="-", splits=3, output_cols="year", drop=False)
df = df.cols.drop("YEAR")
df.compute()
INCIDENT_NUMBER | OFFENSE_CODE | OFFENSE_CODE_GROUP | OFFENSE_DESCRIPTION | DISTRICT | REPORTING_AREA | SHOOTING | OCCURRED_ON_DATE | MONTH | DAY_OF_WEEK | HOUR | UCR_PART | STREET | Lat | Long | Location | year_0 | year_1 | year_2 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | I182070945 | 619 | Larceny | LARCENY ALL OTHERS | D14 | 808 | NaN | 2018-09-02 13:00:00 | 9 | Sunday | 13 | Part One | LINCOLN ST | 42.357791 | -71.139371 | (42.35779134, -71.13937053) | 2018 | 09 | 02 13:00:00 |
1 | I182070943 | 1402 | Vandalism | VANDALISM | C11 | 347 | NaN | 2018-08-21 00:00:00 | 8 | Tuesday | 0 | Part Two | HECLA ST | 42.306821 | -71.060300 | (42.30682138, -71.06030035) | 2018 | 08 | 21 00:00:00 |
2 | I182070941 | 3410 | Towed | TOWED MOTOR VEHICLE | D4 | 151 | NaN | 2018-09-03 19:27:00 | 9 | Monday | 19 | Part Three | CAZENOVE ST | 42.346589 | -71.072429 | (42.34658879, -71.07242943) | 2018 | 09 | 03 19:27:00 |
3 | I182070940 | 3114 | Investigate Property | INVESTIGATE PROPERTY | D4 | 272 | NaN | 2018-09-03 21:16:00 | 9 | Monday | 21 | Part Three | NEWCOMB ST | 42.334182 | -71.078664 | (42.33418175, -71.07866441) | 2018 | 09 | 03 21:16:00 |
4 | I182070938 | 3114 | Investigate Property | INVESTIGATE PROPERTY | B3 | 421 | NaN | 2018-09-03 21:05:00 | 9 | Monday | 21 | Part Three | DELHI ST | 42.275365 | -71.090361 | (42.27536542, -71.09036101) | 2018 | 09 | 03 21:05:00 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
319068 | I050310906-00 | 3125 | Warrant Arrests | WARRANT ARREST | D4 | 285 | NaN | 2016-06-05 17:25:00 | 6 | Sunday | 17 | Part Three | COVENTRY ST | 42.336951 | -71.085748 | (42.33695098, -71.08574813) | 2016 | 06 | 05 17:25:00 |
319069 | I030217815-08 | 111 | Homicide | MURDER, NON-NEGLIGIENT MANSLAUGHTER | E18 | 520 | NaN | 2015-07-09 13:38:00 | 7 | Thursday | 13 | Part One | RIVER ST | 42.255926 | -71.123172 | (42.25592648, -71.12317207) | 2015 | 07 | 09 13:38:00 |
319070 | I030217815-08 | 3125 | Warrant Arrests | WARRANT ARREST | E18 | 520 | NaN | 2015-07-09 13:38:00 | 7 | Thursday | 13 | Part Three | RIVER ST | 42.255926 | -71.123172 | (42.25592648, -71.12317207) | 2015 | 07 | 09 13:38:00 |
319071 | I010370257-00 | 3125 | Warrant Arrests | WARRANT ARREST | E13 | 569 | NaN | 2016-05-31 19:35:00 | 5 | Tuesday | 19 | Part Three | NEW WASHINGTON ST | 42.302333 | -71.111565 | (42.30233307, -71.11156487) | 2016 | 05 | 31 19:35:00 |
319072 | 142052550 | 3125 | Warrant Arrests | WARRANT ARREST | D4 | 903 | NaN | 2015-06-22 00:12:00 | 6 | Monday | 0 | Part Three | WASHINGTON ST | 42.333839 | -71.080290 | (42.33383935, -71.08029038) | 2015 | 06 | 22 00:12:00 |
319073 rows × 19 columns
from prefect import Parameter, Flow
@task
def run_job():
# print(1)
logger = prefect.context.get("logger")
logger.info("saving reference data...")
file = "data/crime.csv"
df = op.load.csv(file, sep=",", error_bad_lines=False, header=True, null_value="null", infer_schema='true', charset="latin1").persist()
df = df.cols.unnest("OCCURRED_ON_DATE", separator="-", splits=3, output_cols="year", drop=False)
df = df.cols.drop("YEAR")
df.compute()
# schedules.clocks.IntervalClock(
# start_date=pendulum.datetime(
# 2019, 1, 1, tz="America/New York", interval=timedelta(days=1)
# )
# )
from prefect.schedules import IntervalSchedule
from datetime import timedelta
schedule = IntervalSchedule(interval=timedelta(minutes=2))
with Flow("Optimus ETL", schedule) as flow:
result = run_job()
# flow.visualize()
from prefect.engine.executors import LocalExecutor
flow.run(executor=LocalExecutor())
[2020-03-23 03:35:39,488] INFO - prefect.Flow: Optimus ETL | Waiting for next scheduled run at 2020-03-23T03:36:00+00:00 [2020-03-23 03:36:00,001] INFO - prefect.FlowRunner | Beginning Flow run for 'Optimus ETL' [2020-03-23 03:36:00,008] INFO - prefect.FlowRunner | Starting flow run. [2020-03-23 03:36:00,025] INFO - prefect.TaskRunner | Task 'run_job': Starting task run... [2020-03-23 03:36:00,026] INFO - prefect.Task: run_job | saving reference data... [2020-03-23 03:36:02,276] INFO - prefect.TaskRunner | Task 'run_job': finished task run for task with final state: 'Success' [2020-03-23 03:36:02,277] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded [2020-03-23 03:36:02,278] INFO - prefect.Flow: Optimus ETL | Waiting for next scheduled run at 2020-03-23T03:38:00+00:00
### Schedule https://docs.prefect.io/core/concepts/schedules.html#complex-schedules