#!/usr/bin/env python # coding: utf-8 # # Combined DFO_OPDB and PRISM databases # - add N, Si, and ancillary data from DFO and from PRISM # - check for duplicate entries # In[1]: # imports import sqlalchemy from sqlalchemy import create_engine, Column, String, Integer, Numeric, MetaData, Table, type_coerce, ForeignKey, case from sqlalchemy.orm import mapper, create_session, relationship, aliased, Session from sqlalchemy.ext.declarative import declarative_base import csv from sqlalchemy import case import numpy as np from sqlalchemy.ext.automap import automap_base import matplotlib.pyplot as plt import sqlalchemy.types as types import numbers from sqlalchemy.sql import and_, or_, not_, func from sqlalchemy.sql import select import os import glob import re from os.path import isfile from mpl_toolkits.basemap import Basemap, shiftgrid, cm import gsw import matplotlib.cm as cmm import matplotlib.colors as col import matplotlib.colors as col import createDBfromDFO_OPDB from netCDF4 import Dataset import datetime as dt import dateutil as dutil get_ipython().run_line_magic('matplotlib', 'inline') # In[2]: # definitions basepath='/ocean/eolson/MEOPAR/obs/' dbnameOPDB='DFOOPDB/DFO_OcProfDB' dbnamePRISM='NANOOS_PRISMCRUISES/PRISM' dbnameComb='combinedForICs' # ## First, create database to hold combined data: # In[3]: engine = create_engine('sqlite:///' + basepath + dbnameComb + '.sqlite') Base=declarative_base() class forceNumeric(types.TypeDecorator): impl = types.Numeric def process_bind_param(self, value, dialect): try: int(float(value)) except: value = None return value class forceInt(types.TypeDecorator): impl = types.Integer def process_bind_param(self, value, dialect): try: int(value) except: value = None return value # In[4]: # define Table Classes: class StationTBL(Base): __table__=Table('StationTBL', Base.metadata, Column('ID', Integer, primary_key=True), Column('Lat', forceNumeric), Column('Lon', forceNumeric), Column('Year', forceInt), Column('Month', forceInt), Column('Day', forceInt), Column('sourceStaID', forceNumeric), Column('sourceDB', String), Column('sourceStaName', String)) class ObsTBL(Base): __table__=Table('ObsTBL', Base.metadata, Column('ID', Integer, primary_key=True), Column('StationTBLID', forceInt, ForeignKey('StationTBL.ID')), Column('sourceObsID', Integer), Column('sourceStaID', Integer), Column('sourceDB', String), Column('Z', forceNumeric), Column('Press', forceNumeric), Column('T', forceNumeric), Column('gswTC', forceNumeric), Column('S', forceNumeric), Column('gswSA', forceNumeric), Column('gswRho', forceNumeric), Column('NO3', forceNumeric), Column('NH4', forceNumeric), Column('Si', forceNumeric), Column('PON', forceNumeric), Column('DON', forceNumeric), Column('bSi', forceNumeric)) station=relationship(StationTBL, primaryjoin=__table__.c.StationTBLID == StationTBL.ID) # In[5]: Base.metadata.create_all(engine) session = create_session(bind = engine, autocommit = False, autoflush = True) # ## Next, open DFO_OPDB database and extract data: # In[6]: Base1 = automap_base() engine1 = create_engine('sqlite:///' + basepath + dbnameOPDB + '.sqlite', echo = False) # reflect the tables: Base1.prepare(engine1, reflect=True) Sta1=Base1.classes.StationTBL Obs1=Base1.classes.ObsTBL session1 = create_session(bind = engine1, autocommit = False, autoflush = True) # ### definitions: # In[7]: Press1=case([(Obs1.Pressure!=None, Obs1.Pressure)], else_=Obs1.Pressure_Reversing) # In[8]: Tem1=case([(Obs1.Temperature!=None, Obs1.Temperature)], else_= case([(Obs1.Temperature_Primary!=None, Obs1.Temperature_Primary)], else_= case([(Obs1.Temperature_Secondary!=None, Obs1.Temperature_Secondary)], else_=Obs1.Temperature_Reversing))) # In[9]: Sal1=case([(Obs1.Salinity_Bottle!=None, Obs1.Salinity_Bottle)], else_= case([(Obs1.Salinity_T0_C0!=None, Obs1.Salinity_T0_C0)], else_= case([(Obs1.Salinity_T1_C1!=None, Obs1.Salinity_T1_C1)], else_= case([(Obs1.Salinity!=None, Obs1.Salinity)], else_= case([(Obs1.Salinity__Unknown!=None, Obs1.Salinity__Unknown)], else_=Obs1.Salinity__Pre1978) )))) Sal1Flag=case([(Obs1.Salinity_Bottle!=None, Obs1.Flag_Salinity_Bottle)], else_= case([(Obs1.Salinity_T0_C0!=None, Obs1.Flag_Salinity)], else_= case([(Obs1.Salinity_T1_C1!=None, Obs1.Flag_Salinity)], else_= case([(Obs1.Salinity!=None, Obs1.Flag_Salinity)], else_= case([(Obs1.Salinity__Unknown!=None, Obs1.Flag_Salinity)], else_=Obs1.Quality_Flag_Sali) )))) Sal1_noflag=case([(Sal1Flag>2, None)], else_=Sal1) # In[10]: NO1=case([(Obs1.Nitrate_plus_Nitrite!=None, Obs1.Nitrate_plus_Nitrite)], else_=Obs1.Nitrate) NO1Flag=case([(Obs1.Nitrate_plus_Nitrite!=None, Obs1.Flag_Nitrate_plus_Nitrite)], else_=Obs1.Flag_Nitrate) # Obs.Quality_Flag_Nitr does not match any nitrate obs # ISUS not included in this NO # units are micromolar (muM) NO1_noflag=case([(NO1Flag>2, None)], else_=NO1) # In[11]: NH1_noflag=case([(Obs1.Flag_Ammonium>2, None)], else_=Obs1.Ammonium) # In[12]: Si1_noflag=case([(Obs1.Flag_Silicate>2, None)], else_=Obs1.Silicate) # In[13]: DON_noflag=case([(Obs1.Flag_Nitrogen_Dissolved_Organic>2, None)], else_=Obs1.Nitrogen_Dissolved_Organic) # In[14]: PON_noflag=Obs1.Nitrogen_Particulate_Organic # do not include data with quality flag 3 or greater. # from 2011-27-0020.che: # Flag channels initialized with zeros. Non-zero values have the following significance: # -------------------------------------------------------------------------------------- # 1 = Sample for this measurement was drawn from water bottle but not analyzed # (not normally used). # 2 = Acceptable measurement (not normally used). # 3 = Questionable measurement (no problem observed in sampling or analysis, # but value is not trusted, nonetheless; includes outlyers). # 4 = Bad measurement (known problem with sampling or analysis, but not # serious enough to completely discard the value). # 5 = Not reported (lost sample; unredeemably bad measurement). # 6 = Mean of replicate measurements. # 7 = Manual chromatographic peak measurement. # 8 = Irregular digital chromatographic peak integration. # 9 = Sample not drawn for this measurement from this bottle (not normally used). # ### start with OBS table: # In[15]: qry1_Obs=session1.query(Obs1.ID, Obs1.StationTBLID, Obs1.Depth, Press1, Tem1, Sal1_noflag, NO1_noflag, NH1_noflag,\ Si1_noflag, DON_noflag, PON_noflag, Sta1.Lon, Sta1.Lat).select_from(Obs1).\ join(Sta1,Sta1.ID==Obs1.StationTBLID).filter(or_( NO1_noflag!=None, NH1_noflag!=None, Si1_noflag!=None, DON_noflag!=None, PON_noflag!=None)) # In[16]: jj=0 for iSourceID, iSourceStaID, iZ, iP, iT, iS, iNO, iNH, iSi, iDON, iPON, iLon, iLat in qry1_Obs.all(): jj+=1 idict={} idict['sourceObsID']=iSourceID idict['sourceStaID']=iSourceStaID idict['sourceDB']='DFO_OPDB' idict['Z']=iZ idict['Press']=iP idict['T']=iT idict['S']=iS idict['NO3']=iNO idict['NH4']=iNH idict['Si']=iSi idict['PON']=iPON idict['DON']=iDON if iS!=None and iP!=None and iLon!=None and iLat!=None: iSA=gsw.SA_from_SP(iS,iP,iLon,iLat) idict['gswSA']=iSA if iT!=None: idict['gswRho']=gsw.rho(iSA,iT,iP) idict['gswTC']=gsw.CT_from_t(iSA,iT,iP) # enter in new db Obs table: session.execute(ObsTBL.__table__.insert().values(**idict)) if jj%5000==1: print(idict) # ### add data to StationTBL # In[17]: qry1_Sta=session1.query(Sta1.ID, Sta1.Lat, Sta1.Lon, Sta1.StartYear, Sta1.StartMonth, Sta1.StartDay,\ Sta1.STATION).select_from(Obs1).\ join(Sta1,Sta1.ID==Obs1.StationTBLID).filter(or_( NO1_noflag!=None, NH1_noflag!=None, Si1_noflag!=None, DON_noflag!=None, PON_noflag!=None)).group_by(Sta1.ID) # In[18]: jj=0 for iID, iLat, iLon, iYr, iMo, iDy, iSN in qry1_Sta.all(): jj+=1 idict={} idict['Lat']=iLat idict['Lon']=iLon idict['Year']=iYr idict['Month']=iMo idict['Day']=iDy idict['sourceStaID']=iID idict['sourceDB']='DFO_OPDB' idict['sourceStaName']=iSN session.execute(StationTBL.__table__.insert().values(**idict)) if jj%1000==1: print(idict) # ### update ObsTBL to have correct foreign keys based on DFO_OPDB relationships # In[19]: jj=0 for iObs in session.query(ObsTBL).all(): jj+=1 if jj%5000==1: print(iObs.ID,iObs.StationTBLID, iObs.sourceStaID) iObs.StationTBLID=session.query(StationTBL.ID).filter(StationTBL.sourceStaID==iObs.sourceStaID).one()[0] if jj%5000==1: print(iObs.ID,iObs.StationTBLID, iObs.sourceStaID) # In[20]: session.commit() session1.close() engine1.dispose() # ## Next, load PRISM data: # In[21]: Base2 = automap_base() engine2 = create_engine('sqlite:///' + basepath + dbnamePRISM + '.sqlite', echo = False) # reflect the tables: Base2.prepare(engine2, reflect=True) Obs2=Base2.classes.ObsTBL Cast2=Base2.classes.CastTBL session2 = create_session(bind = engine2, autocommit = False, autoflush = True) # In[22]: qry2_0=session2.query(Obs2.id, Obs2.cast_dbid, Obs2.depth,Obs2.cast_lat.label('lat'),Obs2.cast_lon.label('lon')).filter(or_( Obs2.variable=='ammonium_concentration', Obs2.variable=='nitrate_concentration', Obs2.variable=='silicate_concentration')).subquery() qry2_ObsBase=session2.query(qry2_0.c.id, qry2_0.c.cast_dbid, qry2_0.c.depth, qry2_0.c.lat, qry2_0.c.lon).group_by(qry2_0.c.cast_dbid, qry2_0.c.depth).subquery() # In[23]: test=session2.query(qry2_ObsBase.c.id,qry2_ObsBase.c.cast_dbid,qry2_ObsBase.c.depth, qry2_ObsBase.c.lat, qry2_ObsBase.c.lon).limit(5).all() for row in test: print(row) # ### some variables have multiple entires at a depth and station; take the average: # In[24]: qNO=session2.query(Obs2.cast_dbid,Obs2.depth,func.avg(Obs2.value).label('value')).filter(Obs2.variable=='nitrate_concentration').group_by(Obs2.cast_dbid,Obs2.depth).subquery() qNH=session2.query(Obs2.cast_dbid,Obs2.depth,func.avg(Obs2.value).label('value')).filter(Obs2.variable=='ammonium_concentration').group_by(Obs2.cast_dbid,Obs2.depth).subquery() qSi=session2.query(Obs2.cast_dbid,Obs2.depth,func.avg(Obs2.value).label('value')).filter(Obs2.variable=='silicate_concentration').group_by(Obs2.cast_dbid,Obs2.depth).subquery() qP=session2.query(Obs2.cast_dbid,Obs2.depth,func.avg(Obs2.value).label('value')).filter(Obs2.variable=='water_pressure').group_by(Obs2.cast_dbid,Obs2.depth).subquery() qT=session2.query(Obs2.cast_dbid,Obs2.depth,func.avg(Obs2.value).label('value')).filter(Obs2.variable=='water_temperature').group_by(Obs2.cast_dbid,Obs2.depth).subquery() qS=session2.query(Obs2.cast_dbid,Obs2.depth,func.avg(Obs2.value).label('value')).filter(Obs2.variable=='water_salinity').group_by(Obs2.cast_dbid,Obs2.depth).subquery() # In[25]: qry2_all=session2.query(qry2_ObsBase.c.id, qry2_ObsBase.c.lat, qry2_ObsBase.c.lon, qry2_ObsBase.c.cast_dbid, qry2_ObsBase.c.depth, qNO.c.value, qNH.c.value, qSi.c.value, qP.c.value, qT.c.value, qS.c.value).\ outerjoin(qNO, and_(qNO.c.cast_dbid==qry2_ObsBase.c.cast_dbid, qNO.c.depth==qry2_ObsBase.c.depth)).\ outerjoin(qNH, and_(qNH.c.cast_dbid==qry2_ObsBase.c.cast_dbid, qNH.c.depth==qry2_ObsBase.c.depth)).\ outerjoin(qSi, and_(qSi.c.cast_dbid==qry2_ObsBase.c.cast_dbid, qSi.c.depth==qry2_ObsBase.c.depth)).\ outerjoin(qP, and_(qP.c.cast_dbid==qry2_ObsBase.c.cast_dbid, qP.c.depth==qry2_ObsBase.c.depth)).\ outerjoin(qT, and_(qT.c.cast_dbid==qry2_ObsBase.c.cast_dbid, qT.c.depth==qry2_ObsBase.c.depth)).\ outerjoin(qS, and_(qS.c.cast_dbid==qry2_ObsBase.c.cast_dbid, qS.c.depth==qry2_ObsBase.c.depth)) # In[26]: test=qry2_all.count() print(test) # In[27]: test2=session2.query(qry2_ObsBase).count() print(test2) # In[28]: jj=0 for iID, iLat, iLon, iCastID, iZ, iNO, iNH, iSi, iP, iT, iS in qry2_all.all(): jj+=1 idict={} idict['sourceObsID']=iID idict['sourceStaID']=iCastID idict['sourceDB']='PRISM' idict['Z']=iZ idict['T']=iT idict['S']=iS idict['NO3']=iNO idict['NH4']=iNH idict['Si']=iSi idict['Press']=iP if iS!=None and iP!=None and iLon!=None and iLat!=None: iSA=gsw.SA_from_SP(iS,iP,iLon,iLat) idict['gswSA']=iSA if iT!=None: idict['gswRho']=gsw.rho(iSA,iT,iP) idict['gswTC']=gsw.CT_from_t(iSA,iT,iP) # enter in new db Obs table: session.execute(ObsTBL.__table__.insert().values(**idict)) if jj%5000==1: print(idict) # In[29]: qry2_Cast=session2.query(Cast2.cast_dbid, Cast2.cast_datetime, Cast2.cast_lat, Cast2.cast_lon, Cast2.station_name).join(qry2_ObsBase, qry2_ObsBase.c.cast_dbid==Cast2.cast_dbid).\ group_by(Cast2.cast_dbid) # In[30]: print(qry2_Cast.count()) # In[31]: qtest=session2.query(Cast2).count() print(qtest) # In[32]: jj=0 for iID, iDT, iLat, iLon, iSN in qry2_Cast.all(): jj+=1 idict={} idate=dutil.parser.parse(iDT) idict['Lat']=iLat idict['Lon']=iLon idict['Year']=idate.year idict['Month']=idate.month idict['Day']=idate.day idict['sourceStaID']=iID idict['sourceDB']='PRISM' idict['sourceStaName']=iSN session.execute(StationTBL.__table__.insert().values(**idict)) if jj%100==1: print(idict) # In[33]: jj=0 for iObs in session.query(ObsTBL).filter(ObsTBL.sourceDB=='PRISM').all(): jj+=1 if jj%1000==1: print(iObs.ID,iObs.StationTBLID, iObs.sourceStaID) iObs.StationTBLID=session.query(StationTBL.ID).filter(StationTBL.sourceStaID==iObs.sourceStaID, StationTBL.sourceDB=='PRISM').one()[0] if jj%1000==1: print(iObs.ID,iObs.StationTBLID, iObs.sourceStaID) # In[34]: session.commit() session2.close() engine2.dispose() # ## Check for duplicates (inspect entries with same lat, lon, date, depth) # - first, check that longitudes have same sign for both databases: # In[35]: qtest=session.query(func.max(StationTBL.Lon)).group_by(StationTBL.sourceDB).all() for row in qtest: print(row) # - next, check that no stations are duplicated between the different datasets that were combined by comparing Year, Month, Day, Lat, Lon # In[36]: a1=aliased(StationTBL) a2=aliased(StationTBL) # In[44]: qDupSta=session.query(a1.ID.label('ID1'),a2.ID.label('ID2'),a1.Lat,a1.Lon,a1.Year,a1.Month, a1.Day,a1.sourceDB.label('DB1'),a2.sourceDB.label('DB2')).select_from(a1).join(a2,and_( a1.Year==a2.Year, a1.Month==a2.Month, a1.Day==a2.Day, a1.Lat-a2.Lat<0.001, a1.Lat-a2.Lat>-0.001, a1.Lon-a2.Lon<0.001, a1.Lon-a2.Lon>-0.001, a1.ID!=a2.ID, a1.sourceDB!=a2.sourceDB)) # assume there were no duplicates in the original databases # In[45]: for row in qDupSta.all(): print(row) # In[46]: session.close() engine.dispose() # In[48]: indthw = np.loadtxt('/ocean/eolson/MEOPAR/tools/bathymetry/thalweg_working.txt', delimiter=" ", unpack=False) indthw = indthw.astype(int) # In[ ]: