SQLAlchemy ORM basics

by Alex
SQLAlchemy ORM basics

Adding data

To create a new record with data using SQLAlchemy, you need to do the following steps:

  1. Create object
  2. Add object to a session
  3. Load (commit) the session

In SAQLAlchemy, interaction with the database takes place using a session. Fortunately, you don’t have to create it manually. Flask-SQLAlchemy does this. The session object can be accessed with db.session. This is the session object that is responsible for connecting to the database. It is also responsible for the process of transaction. By default, the transaction starts and stays open as long as commits and rollbacks are performed. Let’s run the Python shell to create some model objects:

(env) [email protected]:~/flask_app$python main2.py shell
>>>
>>> > from main2 import db, Post, Tag, Category
>>>
>>>
>>> c1 = Category(name='Python', slug='python')
>>> c2 = Category(name='Java', slug='java')
>>>

Two Category objects were created. You can access their attributes using the dot operator (.):

>>>
>>> > c1.name, c1.slug
('Python', 'python')
>>>
>>> c2.name, c2.slug
('Java', 'java')
>>>

Next you need to add the objects to the session.

>>>
>>> db.session.add(c1)
>>> db.session.add(c2)
>>>

Adding objects does not write them to the database, this process only prepares them for saving the next commit. You can verify this by checking the primary key of the objects.

>>>
>>> print(c1.id)
None
>>>
>>> print(c2.id)
None
>>>

The id attribute of both objects is None. This means that the objects are not stored in the database. Instead of adding one object to the session each time, you can use the add_all() method. The add_all() method takes a list of objects to add to the session.

>>>
>>> db.session.add_all([c1, c1])
>>>

If you try to add an object to the session more than once, no errors will occur. You can view all objects in the session at any time using db.session.new.

>>>
>>> db.session.new
IdentitySet([<None:Python>, <None:java>])
>>>

Finally, you need to call the commit() method to save the objects in the database:

>>>
>>> > db.session.commit()
>>>

If you refer to the id attribute of the Category object now, it will return the primary key, not None.

>>>
>>> print(c1.id)
1
>>>
>>> print(c2.id)
2
>>>

At this point, the categories table in HeidiSQL should look something like this: Adding Data To create a new record with data using The new categories are not yet linked to posts. So c1.posts and c2.posts will return an empty list.

>>>
>>> > c1.posts
[]
>>>
>>> c2.posts
[]
>>>

It is worth trying to create multiple posts.

>>>
>>> p1 = Post(title='Post 1', slug='post-1', content='Post 1', category=c1)
>>> p2 = Post(title='Post 2', slug='post-2', content='Post 2', category=c1)
>>> p3 = Post(title='Post 3', slug='post-3', content='Post 3', category=c2)
>>>

Instead of passing the category when creating the Post object, you can run the following command:

>>>
>>> p1. category=c1 
>>>

Next you need to add objects to the session and commit.

>>>
>>> db.session.add_all([p1, p2, p3])
>>> db.session.commit()
>>>

If you try to access the posts attribute of the Category object now, it will return a non-empty list:

>>>
>>> c1.posts
[<1:Post 1>, <2:Post 2>]
>>>
>>> > c2.posts
[<3:Post 3>]
>>>

On the other side of the relationship, you can access the Category object to which the post belongs by using the category attribute of the post object.

>>>
>>> p1.category
<1:Python
>>>
>>> p2.category
<1:Python>
>>>
>>> > p3.category
<2:Java
>>>

It’s worth recalling that all this is possible thanks to the relationship() instruction in the Category model. Right now there are three posts in the database, but none of them are associated with tags.

>>>
>>> p1.tags, p2.tags, p3.tags
([], [], [])
>>>

It’s time to create tags. This can be done in the shell as follows:

>>>
>>> > t1 = Tag(name="refactoring", slug="refactoring")
>>> t2 = Tag(name="snippet", slug="snippet")
>>> t3 = Tag(name="analytics", slug="analytics")
>>>
>>> db.session.add_all([t1, t2, t3])
>>> db.session.commit()
>>>

This code creates three tag objects and commits them to the database. The posts are still not linked to tags. Here’s how you can bind a Post object to a Tag object.

>>>
>>> p1.tags.append(t1)
>>> p1.tags.extend([t2, t3])
>>> p2.tags.append(t2)
>>> p3.tags.append(t3)
>>>
>>> db.session.add_all([p1, p2, p3])
>>>
>>> db.session.commit()
>>>

This commit adds the following five entries to the post_tags table. The posts are now associated with one or more tags:

>>>
>>> p1.tags
[<1:refactoring>, <2:snippet>, <3:analytics>]
>>>
>>> p2.tags
[<2:snippet>]
>>>
>>> p3.tags
[<3:analytics>]
>>>

On the other hand, you can access posts that are related to a specific tag:

>>>
>>> t1.posts
[<1:posts 1>]
>>>
>>> > t2.posts
[<1:Post 1>, <2:Post 2>]
>>>
>>> t3.posts
[<1:Post 1>, <3:Post 3>]
>>>
>>>

It’s important to note that instead of initially committing Tag objects and then linking them to Post objects, it can all be done this way as well:

>>>
>>> > t1 = Tag(name="refactoring", slug="refactoring")
>>> t2 = Tag(name="snippet", slug="snippet")
>>> t3 = Tag(name="analytics", slug="analytics")
>>>
>>> p1.tags.append(t1)
>>> p1.tags.extend([t2, t3])
>>> p2.tags.append(t2)
>>> p3.tags.append(t3)
>>>
>>> db.session.add(p1)
>>> db.session.add(p2)
>>> db.session.add(p3)
>>>
>>> db.session.commit()
>>>

Note that on lines 11-13, only Post objects are added to the session. Tag and Post objects are linked by a many-to-many relationship. As a result, adding a Post object to the session entails adding Tag objects associated with it. But even if you manually add Tag objects to a session now, there is no error.

Updating data

To update an object, all you have to do is pass a new value to its attribute, add the object to the session, and commit.

>>>
>>> p1.content # initial value
'Post 1'
>>>
>>> p1.content = "This is content for post 1" # set new value
>>> db.session.add(p1)
>>>
>>> db.session.commit()
>>>
>>> p1.content # updated value
this is content for post 1
>>>

Data deletion

To delete an object, use the delete() method of the session object. It accepts the object and marks it for deletion at the next commit. Create a new temporary seo tag and associate it with posts p1 and p2:

>>>
>>> tmp = Tag(name='seo', slug='seo') # create a temporary Tag object
>>>
>>> p1.tags.append(tmp)
>>> p2.tags.append(tmp)
>>>
>>> db.session.add_all([p1, p2])
>>> db.session.commit()
>>>

This commit adds just three lines: one to the table table and two more to the post_tags table. In the database, these three lines will look like this: Now you need to remove the seo tag:

>>>
>>> db.session.delete(tmp)
>>> db.session.commit()
>>>

This commit removes all three lines added in the previous step. However, it does not remove the post the tag was associated with. By default, when you delete an object in the parent table (e.g., categories), the foreign key value of the object that is associated with it in the child table (e.g., posts) becomes NULL. The following code demonstrates this behavior by creating a new category object and the post object associated with it and then deleting the category object:

>>>
>>> c4 = Category(name='css', slug='css')
>>> p4 = Post(title='Post 4', slug='post-4', content='Post 4', category=c4)
>>>
>>> db.session.add(c4)
>>>
>>> db.session.new
IdentitySet([<None:css>, <None:Post 4>])
>>>
>>> db.session.commit()
>>>

This commit adds two lines. One to the categories table and one to the posts table. Now we need to see what happens when the Category object is removed.

>>>
>>> db.session.delete(c4)
>>> db.session.commit()
>>>

This commit removes the css category from the categories table and sets the external key(category_id) value of the post associated with it to NULL. In some cases it may be necessary to delete all child records while the parent records are already deleted. This can be done by passing cascade='all,delete-orphan' to the db.relationship() instruction. Let’s open main2.py to change the db.relationship() instruction in the Catagory model:

#...
class Category(db.Model):
   #...
    posts = db.relationship('posts', backref='category', cascade='all,delete-orphan')
#...

From this point on, deleting a category will cause the posts that are associated with it to be deleted. You need to restart the shell to make this work. Next, import the desired objects and create the category along with the post:

(env) [email protected]:~/flask_app$python main2.py shell
>>>
>>> > from main2 import db, Post, Tag, Category
>>>
>>> c5 = Category(name='css', slug='css')
>>> p5 = Post(title='Post 5', slug='post-5', content='Post 5', category=c5)
>>>
>>> db.session.add(c5)
>>> db.session.commit()
>>>

Here’s what the database looks like after this commit. SQLAlchemy ORM basics SQLAlchemy ORM basics Let’s delete the category.

>>>
>>> db.session.delete(c5)
>>> db.session.commit()
>>>

After this commit, the database looks like this:SQLAlchemy ORM basicsSQLAlchemy ORM basics

Data query

To query a database, the query() method of the session object is used. The query() method returns a flask_sqlalchemy.BaseQuery object, which is an extension to the original sqlalchemy.orm.query.Query object. The flask_sqlalchemy.BaseQuery object is a SELECT statement that will be used to make queries to the database. This table lists the main methods of the flask_sqlalchemy.BaseQuery class.

Method Description
all() Returns query result (represented by flask_sqlalchemy.BaseQuery) as a list.
count() Returns the number of records in the query.
first() Returns the first query result, or None if it has no rows.
first_or_404() Returns the first query result or a 404 error if it has no rows.
get(pk) Returns an object that matches this primary key, or None if no object is found.
get_or_404(pk) Returns an object that matches the given primary key, or a 404 error if the object is not found.
filter(*criterion) Returns a new instance of flask_sqlalchemy.BaseQuery with the WHERE operator.
limit(limit) Returns a new flask_sqlalchemy.BaseQuery instance with the LIMIT operator.
offset(offset) Returns a new flask_sqlalchemy.BaseQuery instance with the OFFSET operator.
order_by(*criterion) Returns a new instance of flask_sqlalchemy.BaseQuery with the OFFSET operator.
join() Returns a new flask_sqlalchemy.BaseQuery instance after creating an SQL JOIN.

All() method

In its simplest form, the query() method takes one or more model classes or columns as arguments. The following code will return all records from the posts table.

>>>
>>> db.session.query(Post).all()
[<1:Post 1>, <2:Post 2>, <3:Post 3>, <4:Post 4>]
>>>

Similarly, the following code will return all records from categories and tags.

>>>
>>> > db.session.query(Category).all()
[<1:Python>, <2:Java>]
>>>
>>>
>>> > db.session.query(Tag).all()
[<1:refactoring>, <2:snippet>, <3:analytics>]
>>>

To get the clean SQL used to query the database, you just need to output flask_sqlalchemy.BaseQuery object:

>>>
>>> > print(db.session.query(Post))
SELECT
    posts.id AS posts_id,
    posts.title AS posts_title,
    posts.slug AS posts_slug,
    posts.content AS posts_content,
    posts.created_on AS posts_created_on,
    posts.u pdated_on AS posts_updated_on,
    posts.category_id AS posts_category_id
FROM
    posts
>>>

In the previous examples, data was returned from all columns in the table. This can be changed by passing the column names to the query() method:

>>>
>>> > db.session.query(Post.id, Post.title).all()
[(1, 'Post 1'), (2, 'Post 2'), (3, 'Post 3'), (4, 'Post 4')]
>>>

The count() method

The count() method returns the number of results in the query.

>>>
>>> > db.session.query(Post).count() # get total amount of records in Post table
4
>>> db.session.query(Category).count() # get total number of entries in the Category table
2
>>> db.session.query(Tag).get total amount  of entries in Tag table
3
>>>

Method first()

The first() method returns only the first query from the query, or None if there are no results in the query.

>>>
>>> db.session.query(Post).first()
<1:Post 1>.
>>>
>>> > db.session.query(Category).first()
<1:Python>.
>>>
>>> > db.session.query(Tag).first()
<1:refactoring>
>>>

Get() method

The get() method will return an instance of the object with the corresponding primary key, or None if no such object was found.

>>>
>>> > db.session.query(Post).get(2)
<2:Post 2>.
>>>
>>> > db.session.query(Category).get(1)
<1:Python>.
>>>
>>> print(db.session.query(Category).get(10)) # nothing found by primary key 10
None
>>>

Get_or_404() method

Same as the get( ) method, but instead of None it will return a 404 error if the object was not found.

>>>
>>> db.session.query(Post).get_or_404(1)
<1:Post 1>.
>>>
>>>
>>> db.session.query(Post).get_or_404(100)
Traceback (most recent call last):
...
werkzeug.NotFound: 404 Not Found: The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.
>>>

The filter() method

The filter() method allows you to sort results using the WHERE operator applied to the query. It accepts a column, operator, or value. For example:

>>>
>>> db.session.query(Post).filter(Post.title == 'Post 1').all()
[<1:Post 1>]
>>>

The query will return all posts with the title "Post 1". The SQL equivalent of the query is as follows:

>>>
>>> > print(db.session.query(Post).filter(Post.title == 'Post 1'))
SELECT
    posts.id AS posts_id,
    posts.title AS posts_title,
    posts.slug AS posts_slug,
    posts.content AS posts_content,
    posts.created_on AS posts_created_on,
    posts.u pdated_on AS posts_updated_on,
    posts.category_id AS posts_category_id
FROM
    posts
WHERE
    posts.title = % (title_1) s
>>>
>>>

The string % (title_1) s in the WHERE condition is the placeholder. In its place will be the actual value when the query is executed. Several values can be passed to the filter() method, and they will be combined by the AND operator in SQL. For example:

>>>
>>> db.session.query(Post).filter(Post.id >= 1, Post.id <= 2).all()
[<1:Post 1>, <2:Post 2>]
>>>
>>>

This query will return all posts whose primary key is greater than 1 but less than 2. SQL equivalent:

>>>
>>> > print(db.session.query(Post).filter(Post.id >= 1, Post.id <= 2))
SELECT
    posts.id AS posts_id,
    posts.title AS posts_title,
    posts.slug AS posts_slug,
    posts.content AS posts_content,
    posts.created_on AS posts_created_on,
    posts.u pdated_on AS posts_updated_on,
    posts.category_id AS posts_category_id
FROM
    posts
WHERE 
    posts.id >= % (id_1) s
AND posts.id <= % (id_2) s
>>>

Method first_or_404()

Does the same as the first() method, but returns a 404 error instead of None if the query has no result.

>>>
>>> db.session.query(Post).filter(Post.id > 1).first_or_404()
<2:Post 2>.
>>>
>>> db.session.query(Post).filter(Post.id > 10).first_or_404().all()
Traceback (most recent call last):
...
werkzeug.NotFound: 404 Not Found: The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.
>>>

Limit() method

The limit() method adds a LIMIT operator to the query. It takes the number of lines to return with the query.

>>>
>>> db.session.query(Post).limit(2).all()
[<1:Post 1>, <2:Post 2>]
>>>
>>> > db.session.query(Post).filter(Post.id >= 2).limit(1).all()
[<2:Post 2>]
>>>

SQL equivalent:

>>>
>>> > print(db.session.query(Post).limit(2))
SELECT
posts.id AS posts_id,
posts.title AS posts_title,
posts.slug AS posts_slug,
posts.content AS posts_content,
posts.created_on AS posts_created_on,
posts.u pdated_on AS posts_updated_on,
posts.category_id AS posts_category_id
FROM
    posts
LIMIT % (param_1) s
>>>
>>>
>>> > print(db.session.query(Post).filter(Post.id >= 2).limit(1))
SELECT
    posts.id AS posts_id,
    posts.title AS posts_title,
    posts.slug AS posts_slug,
    posts.content AS posts_content,
    posts.created_on AS posts_created_on,
    posts.u pdated_on AS posts_updated_on,
    posts.category_id AS posts_category_id
FROM
    posts
WHERE 
    posts.id >= % (id_1) s
LIMIT % (param_1) s
>>>
>>>

Offset() method

The offset() method adds an OFFSET condition to the query. It takes an offset as an argument. It is often used together with limit().

>>>
>>> > db.session.query(Post).filter(Post.id > 1).limit(3).offset(1).all()
[<3:Post 3>, <4:Post 4>]
>>>

SQL equivalent:

>>>
>>> > print(db.session.query(Post).filter(Post.id > 1).limit(3).offset(1))
SELECT
    posts.id AS posts_id,
    posts.title AS posts_title,
    posts.slug AS posts_slug,
    posts.content AS posts_content,
    posts.created_on AS posts_created_on,
    posts.u pdated_on AS posts_updated_on,
    posts.category_id AS posts_category_id
FROM
    posts
WHERE 
    posts.id > % (id_1) s
LIMIT % (param_1) s, % (param_2) s
>>>

The lines % (param_1) s and % (param_2) are placeholders for offset and output limit, respectively.

The order_by() method

The order_by() method is used to order the result by adding an ORDER BY statement to the query. It takes the number of columns for which you want to set the order. By default, it sorts in ascending order.

>>>
>>> db.session.query(Tag).all()
[<1:refactoring>, <2:snippet>, <3:analytics>]
>>>
>>> > db.session.query(Tag).order_by(Tag.name).all()
[<3:analytics>, <1:refactoring>, <2:snippet>]
>>>

To sort in descending order, use db.desc():

>>>
>>> > db.session.query(Tag).order_by(db.desc(Tag.name)).all()
[<2:snippet>, <1:refactoring>, <3:analytics>]
>>>

The join() method

The join() method is used to create a JOIN in SQL. It takes the name of the table for which you want to create a JOIN.

>>>
>>> > db.session.query(Post).join(Category).all()
[<1:Post 1>, <2:Post 2>, <3:Post 3>]
>>>

SQL equivalent:

>>>
>>> > print(db.session.query(Post).join(Category))
SELECT
    posts.id AS posts_id,
    posts.title AS posts_title,
    posts.slug AS posts_slug,
    posts.content AS posts_content,
    posts.created_on AS posts_created_on,
    posts.u pdated_on AS posts_updated_on,
    posts.category_id AS posts_category_id
FROM
    posts

The join() method is commonly used to get data from one or more tables in a single query. For example:

>>>
>>> > db.session.query(Post.title, Category.name).join(Category).all()
[('Post 1', 'Python'), ('Post 2', 'Python'), ('Post 3', 'Java')]
>>>

You can create a JOIN for more than two tables using a chain of join() methods:

db.session.query(Table1).join(Table2).join(Table3).join(Table4).all()

You can finish the lesson by completing the contact form. Recall that in the lesson “Working with Forms in Flask” we created a contact form to receive feedback from users. So far, the contact() view function doesn’t save the sent data. It only outputs it in the console. To save this information, we first need to create a new table. Let’s open main2.py to add the Feedback model after the Tag model:

#...
class Feedback(db.Model):
    __tablename__ = 'feedbacks'
   id = db.Column(db.Integer(), primary_key=True)
    name = db.Column(db.String(1000), nullable=False)
    email = db.Column(db.String(100), nullable=False)
    message = db.Column(db.Text(), nullable=False)
    created_on = db.Column(db.DateTime(), default=datetime.utcnow)

   def __repr__(self):
	return "<{}:{}>".format(self.id, self.name)
#...

Next we need to restart the Python shell and call the create_all() method of the db object to create the feedbacks table:

(env) [email protected]:~/flask_app$python main2.py shell
>>>
>>> > from main2 import db
>>>
>>> db.create_all()
>>>

You also need to modify the contact() view function:

#...
@app.route('/contact/', methods=['get', 'post'])
def contact():
    form = ContactForm()
   if form.validate_on_submit():
	name = form.name.data
	email = form.email.data
	message = form.message.data
	print(name)
	print(post)
	print(email)
	print(message)

	# this is the database logic
	feedback = Feedback(name=name, email=email, message=message)
	db.session.add(feedback)
	db.session.commit()

	print("\nData received. Now redirecting ...")
	flash("Message Received", "success")
	return redirect(url_for('contact'))
    
   return render_template('contact.html', form=form)
#...

Start the server and go to https://127.0.0.1:5000/contact/ to fill out and submit the form.SQLAlchemy ORM basics The submitted HeidiSQL record will look like thisSQLAlchemy ORM basics

Related Posts

LEAVE A COMMENT