#!/usr/bin/env python # coding: utf-8 # In[1]: import sqlalchemy sqlalchemy.__version__ # In[2]: from sqlalchemy import create_engine engine = create_engine('sqlite:///:memory:', echo = True) # In[3]: from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() # In[4]: from sqlalchemy import Column, Integer, String class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String) fullname = Column(String) password = Column(String) def __repr__(self): return "" % (self.name, self.fullname, self.password) # In[5]: # Create a Schema # In[6]: User.__table__ # In[7]: Base.metadata # In[8]: Base.metadata.create_all(engine) # In[9]: ed_user = User(name='ed', fullname='Ed Jones', password='edspassword') ed_user.name # In[10]: ed_user.password # In[11]: str(ed_user.id) # In[12]: from sqlalchemy.orm import sessionmaker Session = sessionmaker(bind=engine) # In[13]: session = Session() # In[14]: session.add(ed_user) # In[15]: our_user = session.query(User).filter_by(name='ed').first() our_user # In[16]: user2 = User(name='Sara', fullname='Sara Jones', password='saraspassword') # In[17]: session.add(user2) # In[18]: session.add_all([ User(name='wendy', fullname='Wendy Williams', password='foobar'), User(name='mary', fullname='Mary Contrary', password='xxg527'), User(name='fred', fullname='Fred Flinstone', password='blah') ]) # In[19]: ed_user.password = 'f8s7ccs' # In[22]: ed_user is our_user # In[23]: our_user.password = 'test' # In[24]: session.dirty # In[21]: session.new # In[22]: session.commit() # In[23]: ed_user.id # In[24]: user2.id # In[25]: ed_user.name = 'Edwardo' fake_user = User(name = 'fakeuser', fullname='Invalid', password='password') session.add(fake_user) # In[26]: session.query(User).filter(User.name.in_(['Edwardo', 'fakeuser'])).all() # In[27]: session.dirty # In[28]: session.rollback() ed_user.name # In[29]: session.query(User).filter(User.name.in_(['ed','fakeuser'])).all() # In[30]: #Querying # In[31]: for instance in session.query(User).order_by(User.id): print(instance.name, instance.fullname) # In[32]: for name, fullname in session.query(User.name, User.fullname): print(name, fullname) # In[33]: for row in session.query(User, User.name).all(): print(row.User, row.User.id, row.name) # In[34]: for row in session.query(User.name.label('name_label')).all(): print(row.name_label) # In[35]: from sqlalchemy.orm import aliased user_alias = aliased(User, name='user_alias') for row in session.query(user_alias, user_alias.name).all(): print(row.user_alias, row.name) # In[36]: for u in session.query(User).order_by(User.id)[1:3]: print(u.id) # In[37]: for name, in session.query(User.name).filter_by(fullname='Ed Jones'): print(name) # In[38]: for user in session.query(User).filter(User.name=='ed').filter(User.fullname=='Ed Jones'): print(user) # In[39]: # Common Filter Operators # == # != # .like(), .like('%ed%') # .in_([]) .in_(['ed', 'wendy','jack']) # Not in: query.filter(~User.name.in_(['ed','wendy','jack'])) # is null: == None # is not null: != None # and_() # or_() # .match() *not available on SQLite # In[40]: for user in session.query(User).filter(User.name.in_( session.query(User.name).filter(User.name.like('%ed%')) )): print(user) # In[41]: # Returning Lists and Scalars # all() returns list # first() reutrn first result as scalar # one() returns error unless result contains exactly one row # one_or_none() # scalar() invokes 1 and if successful returns first column # In[42]: from sqlalchemy.orm.exc import MultipleResultsFound try: user = session.query(User).filter(User.name.like('%ed%')).one() except MultipleResultsFound as e: print(e) # In[43]: query=session.query(User.id).filter(User.name == 'ed').order_by(User.id) query.scalar() # In[44]: # Using Textual SQL # In[45]: from sqlalchemy import text for user in session.query(User).filter(text("id<4")).order_by(text("id")).all(): print(user.id, user.name) # In[46]: session.query(User).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(User.id).one() # In[47]: session.query(User).from_statement( text("SELECT * FROM users where name=:name")).params(name='ed').all() # In[48]: # Counting session.query(User).filter(User.name.like('%ed')).count() # In[49]: from sqlalchemy import func session.query(func.count(User.name), User.name).group_by(User.name).all() # In[50]: session.query(func.count('*')).select_from(User).scalar() # In[51]: # in terms of primary key: session.query(func.count(User.id)).scalar() # In[52]: # Building a Relationship # In[53]: from sqlalchemy import ForeignKey from sqlalchemy.orm import relationship class Address(Base): __tablename__='addresses' id = Column(Integer, primary_key=True) email_address = Column(String, nullable=False) user_id = Column(Integer, ForeignKey('users.id')) user = relationship("User", back_populates="addresses") def __repr__(self): return "" % self.email_address User.addresses = relationship("Address", order_by=Address.id, back_populates="user") # In[54]: # create addresses table: Base.metadata.create_all(engine) # In[55]: # Working with Related Objects # In[56]: jack = User(name='jack', fullname='Jack Bean', password='gjffdd') jack.addresses # In[57]: ed_user.addresses # In[58]: jack.addresses = [ Address(email_address='jack@google.com'), Address(email_address='j25@yahoo.com') ] # In[59]: jack.addresses[1] # In[60]: jack.addresses[1].user # In[61]: # commit to database: session.add(jack) session.commit() # In[62]: jack=session.query(User).filter_by(name='jack').one() # In[63]: jack # In[64]: jack.addresses # In[65]: # Querying with Joins # In[66]: for u, a in session.query(User, Address).\ filter(User.id==Address.user_id).filter(Address.email_address=='jack@google.com').all(): print(u) print(a) # In[67]: session.query(User).join(Address).filter(Address.email_address=='jack@google.com').all() # In[68]: # ways to specify a join: # query.join(ADdress, User.id==Address.user_id) # explicit condition # query.join(User.addresses) # specify relationship from left to right # query.join(Address, User.addresses) # same, with explicit target # query.join('addresses') # same, using a string # query.outerjoin(User.addresses) # LEFT OUTER JOIN # In[69]: # Using Aliases # In[70]: # if the same table is to be referenced twice, it must be aliased with another name. # In[71]: from sqlalchemy.orm import aliased adalias1= aliased(Address) adalias2= aliased(Address) for username, email1, email2 in session.query(User.name, adalias1.email_address, adalias2.email_address).\ join(adalias1, User.addresses).join(adalias2, User.addresses).\ filter(adalias1.email_address=='jack@google.com').filter(adalias2.email_address=='j25@yahoo.com'): print(username, email1, email2) # In[72]: # Using Subqueries # In[73]: from sqlalchemy.sql import func stmt = session.query(Address.user_id, func.count('*').label('address_count')).group_by(Address.user_id).subquery() # In[74]: for u, count in session.query(User, stmt.c.address_count).outerjoin(stmt, User.id==stmt.c.user_id).order_by(User.id): print(u,count) # In[75]: # Selecting Entities from Subqueries # In[76]: stmt= session.query(Address).filter(Address.email_address != 'j25@yahoo.com').subquery() adalias= aliased(Address,stmt) for user, address in session.query(User, adalias).join(adalias, User.addresses): print(user) print(address) # In[77]: # Using Exists # In[78]: from sqlalchemy.sql import exists stmt = exists().where(Address.user_id ==User.id) for name, in session.query(User.name).filter(stmt): print(name) # In[79]: for name, in session.query(User.name).filter(User.addresses.any()): print(name) # In[80]: session.query(Address).filter(~Address.user.has(User.name=='jack')).all() # In[81]: # Relationship Operators # == != ==None .contains() .any() .has() .with_parent() # In[82]: # Eager Loading # Subquery Load # In[83]: from sqlalchemy.orm import subqueryload jack = session.query(User).options(subqueryload(User.addresses)).filter_by(name='jack').one() jack # In[84]: jack.addresses # In[85]: # Joined Load # In[86]: from sqlalchemy.orm import joinedload jack = session.query(User).options(joinedload(User.addresses)).filter_by(name='jack').one() jack # In[87]: # Explicit join + Eagerload # In[88]: from sqlalchemy.orm import contains_eager jacks_addresses=session.query(Address).join(Address.user).filter(User.name=='jack').\ options(contains_eager(Address.user)).all() jacks_addresses # In[89]: jacks_addresses[0].user # In[90]: #Deleting # In[91]: session.delete(jack) session.query(User).filter_by(name='jack').count() # In[92]: session.query(Address).filter(Address.email_address.in_(['jack@google.com', 'j25@yahoo.com'])).count() # In[93]: session.close() # In[94]: Base=declarative_base() # In[95]: class User(Base): __tablename__='users' id = Column(Integer, primary_key=True) name = Column(String) fullname = Column(String) password = Column(String) addresses = relationship("Address", back_populates='user', cascade="all, delete, delete-orphan") def __repr__(self): return "" % ( self.name, self.fullname, self.password) # In[96]: class Address(Base): __tablename__ = 'addresses' id = Column(Integer, primary_key=True) email_address = Column(String, nullable=False) user_id = Column(Integer, ForeignKey('users.id')) user = relationship("User", back_populates="addresses") def __repr__(self): return "" % self.email_address # In[98]: jack = session.query(User).get(6) del jack.addresses[1] session.query(Address).filter( Address.email_address.in_(['jack@google.com', 'j25@yahoo.com'])).count() # In[99]: session.delete(jack) # In[100]: session.query(User).filter_by(name='jack').count() # In[101]: session.query(Address).filter(Address.email_address.in_(['jack@google.com','j25@yahoo.com'])).count() # In[102]: # Building a Many to Many Relationship # In[103]: from sqlalchemy import Table, Text # association table post_keywords = Table('post_keywords', Base.metadata, Column('post_id', ForeignKey('posts.id'), primary_key=True), Column('keyword_id', ForeignKey('keywords.id'), primary_key=True)) # In[104]: class BlogPost(Base): __tablename__='posts' id = Column(Integer, primary_key=True) user_id = Column(Integer, ForeignKey('users.id')) headline = Column(String(255), nullable=False) body = Column(Text) # many to many BlogPost<->Keyword keywords = relationship('Keyword', secondary = post_keywords, back_populates='posts') def __init__(self, headline, body, author): self.author = author self.headline = headline self.body = body def __repr__(self): return "BlogPost(%r, %r, %r)" % (self.headline, self.body, self.author) # In[105]: class Keyword(Base): __tablename__='keywords' id = Column(Integer, primary_key=True) keyword=Column(String(50), nullable=False, unique=True) posts = relationship('BlogPost', secondary=post_keywords, back_populates='keywords') def __init__(self, keyword): self.keyword = keyword # In[107]: BlogPost.author= relationship(User, back_populates="posts") User.posts=relationship(BlogPost, back_populates="author", lazy="dynamic") # In[108]: Base.metadata.create_all(engine) # In[109]: wendy=session.query(User).filter_by(name='wendy').one() post=BlogPost("Wendy's Blog Post", "This is a test", wendy) session.add(post) # In[110]: post.keywords.append(Keyword('wendy')) post.keywords.append(Keyword('firstpost')) # In[111]: session.query(BlogPost).filter(BlogPost.keywords.any(keyword='firstpost')).all() # In[112]: session.query(BlogPost).filter(BlogPost.author==wendy).filter(BlogPost.keywords.any(keyword='firstpost')).all() # In[113]: wendy.posts.filter(BlogPost.keywords.any(keyword='firstpost')).all() # In[ ]: # In[ ]: # In[ ]: # In[ ]: # In[ ]: