!mkdir companydata #You may need to check the URLs #http://download.companieshouse.gov.uk/en_output.html #Download the data files and place them in the companydata directory !wget http://download.companieshouse.gov.uk/BasicCompanyData-2014-06-01-part1_5.zip -P companydata !wget http://download.companieshouse.gov.uk/BasicCompanyData-2014-06-01-part2_5.zip -P companydata !wget http://download.companieshouse.gov.uk/BasicCompanyData-2014-06-01-part3_5.zip -P companydata !wget http://download.companieshouse.gov.uk/BasicCompanyData-2014-06-01-part4_5.zip -P companydata !wget http://download.companieshouse.gov.uk/BasicCompanyData-2014-06-01-part5_5.zip -P companydata #Check the files are there... !ls companydata #Unzip the files into the same directory !unzip "companydata/*.zip" -d companydata #Should we tidy up further and delete the zip files too? #Or maybe do this after we know everything is has been loaded... #!rm companydata/*.zip #How many rows are there in each file? !wc -l companydata/*.csv #Let's have a quick peak at the head of one of the files !head -n 3 companydata/BasicCompanyData-2014-06-01-part1_5.csv #Let's see how pandas sees a file import pandas as pd tmp=pd.read_csv('companydata/BasicCompanyData-2014-06-01-part1_5.csv',nrows=3) tmp.head(3) !ls companydata #Let's look at the column names tmp.columns.values #deprecate this in favour of psqlx? #import psycopg2 as pg #If connecting to the default port, you can omit the port parameter #conn = pg.connect(dbname='tm351test', host='localhost', user='test', password='test' , port=5432) #cur = conn.cursor() import pandas as pd pd.__version__ !pip3 install --upgrade import sys sys.version #psql lets us return tables as dataframes from pandas import read_sql_query as psql #psqlx is a convenience fucntion for executing raw queries where we aren't so bothered about the response... #Errors should still be thrown... from pandas.io.sql import execute as psqlx import pandas as pd from sqlalchemy import create_engine #Create the database connection for pandas dataframe operations engine = create_engine('postgresql://test:test@localhost:5432/tm351test') #cur.execute("DROP TABLE companydata;") #psqlx("DROP TABLE companydata;",engine) #cur.execute("DROP TABLE company_postcode_area;") #psqlx("DROP TABLE company_postcode_area;",engine) #cur.execute("DROP INDEX company_postcode_area_idx;") #psqlx("DROP INDEX company_postcode_area_idx;",engine) #To define the table, let's check the longest strings in column #Note that this routine may take a few minutes to run cleanColNames=["CompanyName","RegAddress.CareOf","RegAddress.POBox", \ "RegAddress.AddressLine1","RegAddress.AddressLine2","RegAddress.PostTown","RegAddress.County", \ "RegAddress.Country","RegAddress.PostCode","CompanyCategory","CompanyStatus", \ "CountryOfOrigin"] def colMaxLen(df,maxlens): df.rename(columns=lambda x: x.strip(), inplace=True) for col in cleanColNames: cml=df[col].astype(str).map(len).max() if cml > maxlens[col]: maxlens[col]=cml import os maxlens={} for name in cleanColNames: maxlens[name]=0 for fname in os.listdir('companydata/'): #if a file is a CSV file, process it if fname.endswith('csv'): fname="companydata/"+fname #Read in 10,000 rows at a time chunks=pd.read_csv(fname,chunksize=10000) for chunk in chunks: colMaxLen(chunk,maxlens) maxlens #I'm going to create a table over some of the columns - essentially, the company name, number, address and incoporation date #We can use the maxlens to ensure we create large enough data fields #I wonder if we should just try to create the table from the dataframe? #Perhaps make sure we cast the right type of each column in the dataframe first? #It's easier using psycopg2 to just run the query (will sqlalchemy let us run a "raw" query? Trying psqlx over cur.execute) #cur.execute psqlx("CREATE TABLE companydata ( CompanyName VARCHAR (200), CompanyNumber VARCHAR (10) PRIMARY KEY, \ RegAddress_CareOf VARCHAR (100), RegAddress_POBox VARCHAR (15), RegAddress_AddressLine1 VARCHAR (120), \ RegAddress_AddressLine2 VARCHAR (100), RegAddress_PostTown VARCHAR (60), \ RegAddress_County VARCHAR (60), RegAddress_Country VARCHAR (60), RegAddress_PostCode VARCHAR (20), \ CompanyCategory VARCHAR (100), CompanyStatus VARCHAR (60), CountryOfOrigin VARCHAR (40), \ IncorporationDate date);",engine) #Even though I used mixed case column names, I didn't quote them - so PostgreSQL forces them to lower case #The lack of quotes also meand I had to replace the . in the column names with something more acceptable... (an underscore) psql("SELECT * FROM companydata LIMIT 1;", engine) companydata_cols=['companyname', 'companynumber', 'regaddress_careof', 'regaddress_pobox', 'regaddress_addressline1', 'regaddress_addressline2', 'regaddress_posttown', 'regaddress_county', 'regaddress_country', 'regaddress_postcode', 'companycategory', 'companystatus', 'countryoforigin', 'incorporationdate'] def addchunk(chunk): '''Create a function to clean the companydata from the CSV file and enter it into the database table''' #NOTE IF WE HAVE MORE TABLES BEST TO PROCESS THEM ALL FROM A SINGLE FILE READ? #To conform to the column names in the table I defined, we need to make some changes to the column names #Firstly, replace the points with underscores chunk.rename(columns=lambda x: x.replace('.','_'), inplace=True) #Make sure we strip off any whitespace left and right chunk.rename(columns=lambda x: x.strip(), inplace=True) #Force the column names to lowet case chunk.rename(columns=lambda x: x.lower(), inplace=True) #This adds to the load time but it's often worth doing for col in companydata_cols: chunk[col] = chunk[col].str.strip() #We could have parsed the dates on load, but if we do it here we can keep all the cleaning steps together chunk['incorporationdate']=pd.to_datetime(chunk['incorporationdate'], format='%d/%m/%Y',coerce=True) #I'm actually only interested in companies with an incorporation date chunk.dropna(subset=['incorporationdate'],inplace=True) #NB PostgreSQL may also throw a wobbly if it doesn't see a date when it expects one! #We're not saving all the column names - just name, number, address, incorporation date as per the table definition #If the table exists, which it does, append data to it chunk[companydata_cols].to_sql('companydata', engine, index=False, if_exists='append') #Here's our ingest loop #THIS COULD TAKE SOME TIME TO RUN - MAYBE EVEN AN HOUR OR MORE... #Maybe this is an argument in favour of having several passes for different tables? #Get the names of the files in the companydata directory for fname in os.listdir('companydata/'): #if a file is a CSV file, process it if fname.endswith('csv'): fname="companydata/"+fname #Read in 10,000 rows at a time chunks=pd.read_csv(fname,chunksize=10000,dtype=str) for chunk in chunks: #Call the function that cleans the data and adds it to the database addchunk(chunk) #Do some housekeeping - remove the CSV os.remove(fname) #How many rows did we load in in the end? psql("SELECT count(*) FROM companydata;", engine) psql("SELECT * FROM companydata LIMIT 5;", engine) #cur.execute("CREATE TABLE company_postcode_area AS \ # SELECT companynumber, split_part(regaddress_postcode,' ',1) as postcode_area FROM companydata;") psqlx("CREATE TABLE company_postcode_area AS \ SELECT companynumber, split_part(regaddress_postcode,' ',1) as postcode_area FROM companydata;",engine) #cur.execute("CREATE INDEX company_postcode_area_idx ON company_postcode_area (postcode_area);") psqlx("CREATE INDEX company_postcode_area_idx ON company_postcode_area (postcode_area);",engine) mkCos=psql("SELECT * FROM company_postcode_area \ WHERE postcode_area IN ('MK1','MK2','MK3','MK4','MK5','MK6','MK7','MK8','MK9','MK10','MK11','MK12','MK13','MK14','MK15');",engine) mkCos.groupby('postcode_area').companynumber.agg(len).order(ascending=False) psql("SELECT postcode_area, COUNT(*) FROM company_postcode_area \ WHERE postcode_area IN ('MK1','MK2','MK3','MK4','MK5','MK6','MK7','MK8','MK9','MK10','MK11','MK12','MK13','MK14','MK15') \ GROUP BY postcode_area;",engine) psql("SELECT regaddress_postcode, COUNT(*) FROM companydata \ GROUP BY regaddress_postcode ORDER BY count DESC LIMIT 20;",engine) psql("SELECT regaddress_postcode FROM companydata WHERE regaddress_postcode LIKE 'BS9%%' LIMIT 20;",engine)