#!/usr/bin/env python # coding: utf-8 # # Enabling Data Engineering Workflows # # With Automation API, choosing between small, decoupled units of infrastructure and a maintainable orchestration process is no longer necessary. Your Pulumi code now executes just like any other code in your application, meaning that it is possible to deploy your infrastructure and manage related activities all from within your notebook. # # In this notebook, we will set up an AWS Aurora SQL database and execute a database "migration" using the resulting connection info. For the migration, we will create a table, insert a few rows of data and then read the data back to verify the setup. # # ### Define your database # # Consider the following function called `rds_database`. It creates an AWS Aurora SQL cluster and database instance and exports the relevant connection information. # # In[ ]: import pulumi import pulumi_aws as aws def rds_database(): default_vpc = aws.ec2.get_vpc(default=True) public_subnet_ids = aws.ec2.get_subnet_ids(vpc_id=default_vpc.id) subnet_group = aws.rds.SubnetGroup("db_subnet", subnet_ids=public_subnet_ids.ids) # make a public security group for our cluster for the migration security_group = aws.ec2.SecurityGroup("public_group", ingress=[aws.ec2.SecurityGroupIngressArgs( protocol="-1", from_port=0, to_port=0, cidr_blocks=["0.0.0.0/0"] )], egress=[aws.ec2.SecurityGroupEgressArgs( protocol="-1", from_port=0, to_port=0, cidr_blocks=["0.0.0.0/0"] )]) # example, you should change this db_name = "hellosql" db_user = "hellosql" db_pass = "hellosql" # provision our db cluster = aws.rds.Cluster("db", engine=aws.rds.EngineType.AURORA_MYSQL, engine_version="5.7.mysql_aurora.2.12.1", database_name=db_name, master_username=db_user, master_password=db_pass, skip_final_snapshot=True, db_subnet_group_name=subnet_group.name, vpc_security_group_ids=[security_group.id]) cluster_instance = aws.rds.ClusterInstance("db_instance", cluster_identifier=cluster.cluster_identifier, instance_class=aws.rds.InstanceType.T3_SMALL, engine=aws.rds.EngineType.AURORA_MYSQL, engine_version="5.7.mysql_aurora.2.12.1", publicly_accessible=True, db_subnet_group_name=subnet_group.name) pulumi.export("host", cluster.endpoint) pulumi.export("db_name", db_name) pulumi.export("db_user", db_user) pulumi.export("db_pass", db_pass) # ### Deploy the infrastructure # # Now, let's go ahead and deploy our infrastructure in a Pulumi stack. # In[ ]: import json from pulumi import automation as auto # create (or select if one already exists) a stack that uses our inline program stack = auto.create_or_select_stack(stack_name="dev", project_name="database_migration", program=rds_database) print("successfully initialized stack") # for inline programs, we must manage plugins ourselves print("installing plugins...") stack.workspace.install_plugin("aws", "v4.0.0") # set stack configuration specifying the AWS region to deploy print("setting up config") stack.set_config("aws:region", auto.ConfigValue(value="us-west-2")) print("refreshing stack...") stack.refresh(on_output=print) print("refresh complete") print("updating stack...") up_res = stack.up(on_output=print) print(f"update summary: \n{json.dumps(up_res.summary.resource_changes, indent=4)}") # ### Extract the outputs # # Let's get the important connection details out of the stack outputs. # In[ ]: outputs = stack.outputs() host = outputs['host'].value user = outputs['db_user'].value password = outputs['db_pass'].value database = outputs['db_name'].value host, user, password, database # ### Seed the database # # Alright, now we can configure the database and create our table. # In[ ]: from mysql.connector import connect connection = connect(host=host, user=user, password=password, database=database) create_table_query = """CREATE TABLE IF NOT EXISTS hello_pulumi( id int(9) NOT NULL PRIMARY KEY, color varchar(14) NOT NULL); """ with connection.cursor() as cursor: cursor.execute(create_table_query) connection.commit() # Seed the table with some data to start. # In[ ]: seed_table_query = """INSERT IGNORE INTO hello_pulumi (id, color) VALUES (1, 'Purple'), (2, 'Violet'), (3, 'Plum'); """ with connection.cursor() as cursor: cursor.execute(seed_table_query) connection.commit() # ### Verify the data # # Finally, let's read the data back to ensure that everything worked as expected. # In[ ]: read_table_query = """SELECT COUNT(*) FROM hello_pulumi;""" with connection.cursor() as cursor: cursor.execute(read_table_query) print(f"Result: {cursor.fetchone()}") # ### Clean up # # Now that we're done testing everything out, we can close our database connection and destroy and remove our stack. # In[ ]: connection.close() stack.destroy(on_output=print) stack.workspace.remove_stack("dev") print(f"stack successfully removed")