CRUD operations with SQLAlchemy Core

by Alex
CRUD operations with SQLAlchemy Core

SQL Expression Language is a way to write instructions in Python applications regardless of the type of database used. We will use the table created in the previous article.

Inserting (adding) records

There are several ways to insert records into a database. The main one is the insert() method of the Table instance. You call it, then use the values() method and pass the values for the columns as arguments-keywords:

ins = customers.insert().values(
first_name = 'Dmitriy',
last_name = 'Yatsenko',
username = 'Moseend',
email = '[email protected]',
address = 'Shemilovskiy 2-Y Per., bld. 8/10, appt. 23',
town = 'Vladivostok'
)
print(ins)

To see what SQL code will be generated as a result, you just need to output ins:

INSERT INTO customers (first_name, last_name, username, email, address, town, created_on, updated_on) VALUES (:first_name, :last_name, :username, :email, :address, :town, :created_on, :updated_on)

Note that inside the VALUES statement are related parameters (parameters in :name format) and not the values passed to the values() method. It is only when the query is executed in the database that the dialect will replace them with real values. They will also be escaped, which eliminates the possibility of SQL injection. To see what values will be in place of the associated parameters, use this instruction: ins.compile().params. Conclusion:

{'first_name': 'Dmitriy',
{'last_name': 'Yatsenko',
{'username': 'Moseend',
{'email': '[email protected]',
address': 'Shemilovskiy 2-Y Per., bld. 8/10, appt. 23',
'town': ' Vladivostok',
'created_on': None,
'updated_on': None}

The instruction was created but not sent to the database. To call it, you need to call the execute() method on the Connection object:

ins = customers.insert().values(
first_name = 'Dmitriy',
last_name = 'Yatsenko',
username = 'Moseend',
email = '[email protected]',
address = 'Shemilovskiy 2-Y Per., bld. 8/10, appt. 23',
town = 'Vladivostok'
)
conn = engine.connect()
r = conn.execute(ins)

This code inserts the following record into the customers table: Вставка (добавление) записей The execute() method returns an object of type ResultProxy. The latter provides several attributes, one of which is called inserted_primary_key. It returns the primary key of the inserted record. Another way to create an insert instruction is to use the insert() function from the sqlalchemy library.

from sqlalchemy import insert
ins = insert(customers).values(
first_name = 'Valeriy',
last_name = 'Golyshkin',
username = 'Fortioneaks',
email = '[email protected]',
address = 'Narovchatova, bld. 8, appt. 37',
town = 'Magadan'
)
conn = engine.connect()
r = conn.execute(ins)
print(r.inserted_primary_key)

Output: (2,).

Inserting (adding) multiple records

Instead of passing values to the values() method as keyword arguments, you can pass them to the execute() method.

from sqlalchemy import insert
conn = engine.connect()
ins = insert(customers)
r = conn.execute(ins,
first_name = "Vadim",
last_name = "Moiseenko",
username = "Antence73",
email = "[email protected]",
address = 'Partizanskiy Prospekt, bld. 28/A, appt. 51',
town = ' Vladivostok'
)

The execute() method is quite flexible, because it allows you to insert multiple entries by passing the values as a list of dictionaries, where each is a value for a single string:

r = conn.execute(ins, [
{
"first_name": "Vladimir",
"last_name": "Belousov",
"username": "andescols",
"email": "[email protected]",
"address": "Ul. Usmanova, bld. 70, appt. 223",
"town": "Naberezhnye Chelny"
},
{
"first_name": "Tatyana",
{ "last_name": "Khakimova",
{ "username": "Caltin1962",
"email": "[email protected]",
"address": "Rossiyskaya, bld. 153, appt. 509",
"city": "Ufa"
},
{
"first_name": "Pavel",
{ "last_name": "Arnautov",
{ "username": "Lablen",
"email": "[email protected]",
"address": "Krasnoyarskaya Ul., bld. 35, appt. 57",
"town": "Irkutsk"
},
])
print(r.rowcount)

Conclusion: 3. Before moving on to the next section, let’s also add entries to the items, orders, and order_lines tables:

items_list = [
{
"name": "chair",
"cost_price": 9.21,
"selling_price": 10.81,
"quantity": 6
},
{
{ "name": "pen",
{ "cost_price": 3.45,
{ "selling_price": 4.51,
"quantity": 3
},
{
"name": "headphone",
{ "cost_price": 15.52,
{ "selling_price": 16.81,
{ "quantity": 50
},
{
"name": "Travel Bag",
"cost_price": 20.1,
"selling_price": 24.21,
{ "quantity": 50
},
{
"name": "Keyboard",
"cost_price": 20.12,
{ "selling_price": 22.11,
{ "quantity": 50
},
{
{ "name": "monitor",
"cost_price": 200.14,
"selling_price": 212.89,
{ "quantity": 50
},
{
{ "name": "watch",
{ "cost_price": 100.58,
{ "selling_price": 104.41,
{ "quantity": 50
},
{
"name": "Water Bottle",
"cost_price": 20.89,
"selling_price": 25.00,
{ "quantity": 50
},
]
order_list = [
{
"customer_id": 1
},
{
{ "customer_id": 1
}
]
order_line_list = [
{
"order_id": 1,
{ "item_id": 1,
"quantity": 5
},
{
"order_id": 1,
{ "item_id": 2,
"quantity": 2
},
{
"order_id": 1,
{ "item_id": 3,
"quantity": 1
},
{
"order_id": 2,
{ "item_id": 1,
{ "quantity": 5
},
{
"order_id": 2,
{ "item_id": 2,
{ "quantity": 5
},
]
r = conn.execute(insert(items), items_list)
print(r.rowcount)
r = conn.execute(insert(orders), order_list)
print(r.rowcount)
r = conn.execute(insert(order_lines), order_line_list)
print(r.rowcount)

Output:

8
2
5

Retrieving records

The select() method on an instance of the Table object is used to retrieve records:

s = customers.select()
print(s)

Output:

SELECT customers.id, customers.first_name, customers.last_name, customers.username, customers.email, customers.address, customers.town, customers.created_on, customers.updated_on
FROM customers

This query will return all the records from the customers table. You can also use the select() function instead. It takes a list or columns from which you want to retrieve data.

from sqlalchemy import select
s = select([customers])
print(s)

The output will be the same. The execute() method must be executed to send the query:

from sqlalchemy import select
conn = engine.connect()
s = select([customers])
r = conn.execute(s)
print(r.fetchall())

Output:

[(1, 'Dmitriy', 'Yatsenko', 'Moseend', '[email protected]', 'Shemilovskiy 2-Y Per. 8/10, appt. 23', ' Vladivostok', datetime.datetime(2021, 4, 21, 17, 33, 35, 172583), datetime.datetime(2021, 4, 21, 17, 33, 35, 172583)), (2, 'Valeriy', 'Golyshkin', 'Fortioneaks', '[email protected]', 'Narovchatova, bld. 8, appt. 37', 'Magadan', datetime.datetime(2021, 4, 21, 17, 54, 30, 209109), datetime.datetime(2021, 4, 21, 17, 54, 30, 209109)),...)]

The fetchall() method on the ResultProxy object returns all records matching the query. Once the results are exhausted, subsequent requests to fetchall() will return an empty list. The fetchall() method loads all results into memory at once. In the case of large amounts of data, this is not very efficient. Alternatively, you can use a loop to loop through the results:

s = select([customers])
rs = conn.execute(s)
for row in rs:
print(row)

Output:

(1, 'Dmitriy', 'Yatsenko', 'Moseend', '[email protected]', 'Shemilovskiy 2-Y Per. 8/10, appt. 23', ' Vladivostok', datetime.datetime(2021, 4, 21, 17, 33, 35, 172583), datetime.datetime(2021, 4, 21, 17, 33, 35, 172583))
...
(7, 'Pavel', 'Arnautov', 'Lablen', '[email protected]', 'Krasnoyarskaya Ul., bld. 35, appt. 57', 'Irkutsk', datetime.datetime(2021, 4, 22, 10, 32, 45, 364619), datetime.datetime(2021, 4, 22, 10, 32, 45, 364619))

The following is a list of frequently used methods and attributes of the ResultProxy object: Method/AttributeDescriptionfetchone()Retrieves the next record from the result. If there are no other records, subsequent calls will return Nonefetchmany(size=None)Retrieves a set of records from the result. If there are none, subsequent calls will return Nonefetchall()Extracts all records from the result. If there are no records, it will return Nonefirst()Extracts the first record from the result and closes the connection. This means that after the first() method is called, the remaining records in the result will not be retrieved until a new request is sent using the execute()rowcountreturns the number of rows in the resultkeus()Returns a list of columns from the data sourcecalar()Returns the first column of the first record and closes the connection. If there is no result, it returns None The following terminal sessions show the above methods and attributes in action, where s = select([customers]).

fetchone()

r = conn.execute(s)
print(r.fetchone())
print(r.fetchone())
(1, 'Dmitriy', 'Yatsenko', 'Moseend', '[email protected]', 'Shemilovskiy 2-Y Per. 8/10, appt. 23', ' Vladivostok', datetime.datetime(2021, 4, 21, 17, 33, 35, 172583), datetime.datetime(2021, 4, 21, 17, 33, 35, 172583))
(2, 'Valeriy', 'Golyshkin', 'Fortioneaks', '[email protected]', 'Narovchatova, bld. 8, appt. 37', 'Magadan', datetime.datetime(2021, 4, 21, 17, 54, 30, 209109), datetime.datetime(2021, 4, 21, 17, 54, 30, 209109))

fetchmany()

r = conn.execute(s)
print(r.fetchmany(2))
print(len(r.fetchmany(5)))  # will return 4 because we only have 6 entries
[(1, 'Dmitriy', 'Yatsenko', 'Moseend', '[email protected]', 'Shemilovskiy 2-Y Per. 8/10, appt. 23', ' Vladivostok', datetime.datetime(2021, 4, 21, 17, 33, 35, 172583), datetime.datetime(2021, 4, 21, 17, 33, 35, 172583)), (2, 'Valeriy', 'Golyshkin', 'Fortioneaks', '[email protected]', 'Narovchatova, bld. 8, appt. 37', 'Magadan', datetime.datetime(2021, 4, 21, 17, 54, 30, 209109), datetime.datetime(2021, 4, 21, 17, 54, 30, 209109))]
4

first()

r = conn.execute(s)
print(r.first())
print(r.first())  # this will return an error
(1, 'Dmitriy', 'Yatsenko', 'Moseend', '[email protected]', 'Shemilovskiy 2-Y Per. 8/10, appt. 23', ' Vladivostok', datetime.datetime(2021, 4, 21, 17, 33, 35, 172583), datetime.datetime(2021, 4, 21, 17, 33, 35, 172583))
Traceback (most recent call last):
...
sqlalchemy.exc.ResourceClosedError: This result object is closed.

rowcount

r = conn.execute(s)
print(r.rowcount)
# will return 6

keys()

r = conn.execute(s)
print(r.keys())
RMKeyView(['id', 'first_name', 'last_name', 'username', 'email', 'address', 'town', 'created_on', 'updated_on'])

scalar()

r = conn.execute(s)
print(r.scalar())
# will return 1

It is important to note that fetchxxx() and first() do not return tuples or dictionaries, but objects of type LegacyRow, which allows you to access data in a record using a Column name, index or Column instance. For example:

r = conn.execute(s)
row = r.fetchone()
print(row)
print(type(row))
print(row['id'], row['first_name']) # access to data by column name
print(row[0], row[1]) # access to data by index
print(row[customers.c.id], row[customers.c.first_name]) # access to data via class object
print(row.id, row.first_name) # access data as an attribute

Output:

(1, 'Dmitriy', 'Yatsenko', 'Moseend', '[email protected]', 'Shemilovskiy 2-Y Per. 8/10, appt. 23', ' Vladivostok', datetime.datetime(2021, 4, 21, 17, 33, 35, 172583), datetime.datetime(2021, 4, 21, 17, 33, 35, 172583))
1 Dmitriy
1 Dmitriy
1 Dmitriy
1 Dmitriy

To retrieve data from multiple tables, you need to pass a comma-separated list of Table instances to the select() function: This code will return the Cartesian product of the records from both tables. We’ll talk about SQL JOIN later separately.

Record Filtering

The where() method is used to filter records. It takes a condition and adds a WHERE statement to SELECT:

s = select([items]).where(
items.c.cost_price > 20
)
print(s)
r = conn.execute(s)
print(r.fetchall())

The query will return all items whose price is higher than 20.

SELECT items.id, items.name, items.cost_price, items.selling_price, items.quantity
FROM items
WHERE items.cost_price > :cost_price_1
[(4, 'Travel Bag', Decimal('20.10'), Decimal('24.21'), 50),
(5, 'Keyboard', Decimal('20.12'), Decimal('22.11'), 50),
(6, 'Monitor', Decimal('200.14'), Decimal('212.89'), 50),
(7, 'Watch', Decimal('100.58'), Decimal('104.41'), 50),
(8, 'Water Bottle', Decimal('20.89'), Decimal('25.00'), 50)]

Additional conditions can be set simply by adding several calls to the where() method.

s = select([items]).
where(items.c.cost_price + items.c.selling_price > 50).
where(items.c.quantity > 10)

When using this method, the operators are simply joined with AND. What about using OR or NOT? For this there is:

  1. Bitwise operators
  2. Unions

Bitwise operators

The bitwise operators &, | and ~ allow you to combine conditions with AND, OR or NOT operators from SQL. The previous query can be written like this with bitwise operators:

s = select([items]).
where(
(items.c.cost_price + items.c.selling_price > 50) &
(items.c.quantity > 10)
)

Conditions are enclosed in brackets. This is because bitwise operators have a higher priority than + and > operators.

s = select([items]).
where(
(items.c.cost_price > 200 ) |
(items.c.quantity < 5)
)
print(s)
s = select([items]).
where(
~(items.c.quantity == 50)
)
print(s)
s = select([items]).
where(
~(items.c.quantity == 50) &
(items.c.cost_price < 20)
)
print(s)
SELECT items.id, items.name, items.cost_price, items.selling_price, items.quantity
FROM items
WHERE items.cost_price > :cost_price_1 OR items.quantity < :quantity_1
SELECT items.id, items.name, items.cost_price, items.selling_price, items.quantity
FROM items
WHERE items.quantity != :quantity_1
SELECT items.id, items.name, items.cost_price, items.selling_price, items.quantity
FROM items
WHERE items.quantity != :quantity_1 AND items.cost_price < :cost_price_1

Unions

Conditions can also be combined using the and_(), or_(), and not_() functions. This is the preferred way to add conditions in SQLAlchemy.

from sqlalchemy import select, and_, or_, not_
select([items]).
where(
and_(
items.c.quantity >= 50,
items.c.cost_price = 50,
items.c.cost_price = 50,
items.c.cost_price < 100,
not_(
items.c.name == 'Headphone'
),
)
)

Other common comparison operators

The following list demonstrates how to use other comparison operators when defining conditions in SQLAlchemy IS NULL

select([orders]).where(
orders.c.date_shipped == None
)

IS NOT NULL

select([orders]).where(
orders.c.date_shipped != None
)

IN

select([customers]).where(
customers.c.first_name.in_(["Valeriy", "Vadim"]
)

NOT IN

select([customers]).where(
customers.c.first_name.notin_(["Valeriy", "Vadim"])
)

BETWEEN

select([items]).where(
items.c.cost_price.between(10, 20)
)

NOT BETWEEN

from sqlalchemy import not_
select([items]).where(
not_(items.c.cost_price.between(10, 20))
)

LIKE

select([items]).where(
items.c.name.like("Wa%")
)

The like() method performs a case-sensitive comparison. For case insensitive comparisons, use ilike().

NOT LIKE

select([items]).where(
not_(items.c.name.like("wa%"))
)

Sort result with order_by

The order_by() method adds an ORDER BY statement to the SELECT instruction. It accepts one or more columns for sorting. For each column, you can specify whether to sort ascending(asc()) or descending(desc()). If you don’t specify anything, the sorting will be done in ascending order. For example:

s = select([items]).where(
items.c.quantity > 10
).order_by(items.c.cost_price)
print(s)
print(conn.execute(s).fetchall())

Output:

SELECT items.id, items.name, items.cost_price, items.selling_price, items.quantity
FROM items
WHERE items.quantity > :quantity_1 ORDER BY items.cost_price
[(3, 'Headphone', Decimal('15.52'), Decimal('16.81'), 50),
(4, 'Travel Bag', Decimal('20.10'), Decimal('24.21'), 50),
(5, 'Keyboard', Decimal('20.12'), Decimal('22.11'), 50),
(8, 'Water Bottle', Decimal('20.89'), Decimal('25.00'), 50),
(7, 'Watch', Decimal('100.58'), Decimal('104.41'), 50),
(6, 'Monitor', Decimal('200.14'), Decimal('212.89'), 50)]

The query returns records sorted by cost_price in ascending order. This is equivalent to the following:

from sqlalchemy import asc
s = select([items]).where(
items.c.quantity > 10
).order_by(asc(items.c.cost_price))
rs = conn.execute(s)
rs.fetchall()

Use desc() to sort results in descending order. Example:

from sqlalchemy import desc
s = select([items]).where(
items.c.quantity > 10
).order_by(desc(items.c.cost_price))
conn.execute(s).fetchall()

Here’s another example of sorting by two columns: quantity in ascending order and cost_price in descending order.

s = select([items]).order_by(
items.c.quantity,
desc(items.c.cost_price)
)
conn.execute(s).fetchall()

Limiting results with limit

The limit() method adds a LIMIT operator to the SELECT instruction. It takes an integer that specifies the number of records to return. For example:

s = select([items]).order_by(
items.c.quantity
).limit(2)
print(s)
print(conn.execute(s).fetchall())

Output:

SELECT items.id, items.name, items.cost_price, items.selling_price, items.quantity
FROM items ORDER BY items.quantity
LIMIT :param_1
[(2, 'Pen', Decimal('3.45'), Decimal('4.51'), 3),
(1, 'Chair', Decimal('9.21'), Decimal('10.81'), 5)]

To set the “shift” (initial position) in a LIMIT, you must use the offset() method:

s = select([items]).order_by(
items.c.quantity
).limit(2).offset(2)

Limiting Columns

SELECT instructions created earlier return data from all columns. You can limit the number of fields returned by the query by passing the field names as a list to select(). For example:

s = select([items.c.name, items.c.quantity]).where(
items.c.quantity == 50
)
print(s)
rs = conn.execute(s)
print(rs.keys())
print(rs.fetchall())
SELECT items.name, items.quantity
FROM items
WHERE items.quantity = :quantity_1
RMKeyView(['name', 'quantity'])
[(('Headphone', 50), ('Travel Bag', 50), ('Keyboard', 50), ('Monitor', 50), ('Watch', 50), ('Water Bottle', 50)]

The query returns data only from the name and quantity columns of the items table. Similar to SQL, you can perform calculations on the returned rows before they go into the output. For example:

select([
items.c.name,
items.c.quantity,
items.c.selling_price * 5
]).where(
items.c.quantity == 50
)

Note that items.c.selling_price * 5 is not a real column, so the anonymous name anon_1 is created. You can assign a label to a column or expression using the label() method, which works by adding an AS operator to SELECT.

select([
items.c.name,
items.c.quantity,
(items.c.selling_price * 5).label('price')
]).where(
items.c.quantity == 50
)

Accessing Built-in Functions

A func object is used to access built-in database functions. The following list shows how to use functions to work with date/time, mathematical operations, and strings in a PostgreSQL database.

from sqlalchemy.sql import func
c = [
## date/time functions ##
func.timeofday(),
func.localtime(),
func.current_timestamp(),
func.date_part("month", func.now()),
func.now(),
## mathematical functions ##
func.pow(4,2),
func.sqrt(441),
func.pi(),
func.floor(func.pi()),
func.ceil(func.pi()),
## string functions ##
func.lower("ABC"),
func.upper("abc"),
func.length("abc"),
func.trim("ab c "),
func.chr(65),
]
s = select(c)
rs = conn.execute(s)
print(rs.keys())
print(rs.fetchall())
RMKeyView(['timeofday_1', 'localtime_1', 'current_timestamp_1', 'date_part_1', 'now_1', 'pow_1', 'sqrt_1', 'pi_1', 'floor_1', 'ceil_1', 'lower_1', 'upper_1', 'length_1', 'trim_1', 'chr_1'])
[('Thu Apr 22 12:33:07.655488 2021 EEST', datetime.time(12, 33, 7, 643174), datetime.datetime(2021, 4, 22, 12, 33, 7, 643174, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=180, name=None)), 4.0, datetime.datetime(2021, 4, 22, 12, 33, 7, 643174, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=180, name=None)), 16.0, 21.0, 3.14159265358979, 3.0, 4.0, 'abc', 'ABC', 3, 'ab c', 'A')]

You can also access aggregation functions from an object.

c = [
func.sum(items.c.quantity),
func.avg(items.c.quantity),
func.max(items.c.quantity),
func.min(items.c.quantity),
func.count(items.c.id),
]
s = select(c)
rs = conn.execute(s)
print(rs.fetchall())
# output: [(1848, Decimal('38.500000000000000000'), 50, 3, 48)]

Grouping results with group_by

The grouping of results is done using the GROUP BY operator. It is often used in conjunction with aggregation functions. GROUP BY is added to SELECT using the group_by() method. The latter takes one or more columns and groups rows by the values in those columns. For example:

c = [
func.count("*").label('count'),
customers.c.town
]
s = select(c).group_by(customers.c.town)
print(conn.execute(s).fetchall())

Output:

[(1, 'Ufa'), (1, 'Irkutsk'), (2, ' Vladivostok'), (1, 'Magadan'), (1, ' Naberezhnye Chelny')]

This query returns the number of consumers in each city. To filter the result based on the values of the aggregation functions, the having() method is used, adding the HAVING operator to SELECT. Similar to where(), it takes a condition.

c = [
func.count('*').label('count'),
customers.c.town
]
s = select(c).group_by(customers.c.town).having(func.count("*") > 2)

Joins

The Table instance provides two methods for creating joins:

  1. join() – creates inner join
  2. outerjoin() – creates external join(LEFT OUTER JOIN to be exact)

The internal join returns only those columns that match the join condition, and the external one also returns some additional columns. Both methods take a Table instance, define a join condition based on the relationship in the foreign keys, and return a JOIN construct.

>>> print(customers.join(orders))
customers JOIN orders ON customers.id = orders.customer_id

If the methods cannot define a join condition, or need to provide another condition, they do so by passing the join condition as a second argument.

customers.join(items,
customers.c.address.like(customers.c.first_name + '%')
)

When the select() function specifies tables or a list of columns, SQLAlchemy automatically places those tables in the FROM statement. But when using the join, the tables that are needed in FROM are exactly known, so select_from() is used. The same method can also be used for queries that do not use joins. For example:

s = select([
customers.c.id,
customers.c.first_name
]).select_from(customers)
print(s)
rs = conn.execute(s)
print(rs.keys())
print(rs.fetchall())
SELECT customers.id, customers.first_name
FROM customers
RMKeyView(['id', 'first_name'])
[(1, 'Dmitriy'), (2, 'Valeriy'), (4, 'Vadim'), (5, 'Vladimir'), (6, 'Tatyana'), (7, 'Pavel')]

Use this knowledge to find all orders placed by user Dmitriy Yatsenko.

select([
orders.c.id,
orders.c.date_placed
]).select_from(
orders.join(customers)
).where(
and_(
customers.c.first_name == "Dmitriy",
customers.c.last_name == "Yatsenko",
)
)

The last query returns the id and date_placed order. It would also be nice to know the products and their total number. To do this, you need to make 3 joins all the way down to the items table.

s = select([
orders.c.id.label('order_id'),
orders.c.date_placed,
orders_lines.c.quantity,
items.c.name,
]).select_from(
orders.join(customers).join(order_lines).join(items)
).where(
and_(
customers.c.first_name == "Dmitriy",
customers.c.last_name == "Yatsenko",
)
)
print(s)
rs = conn.execute(s)
print(rs.keys())
print(rs.fetchall())
SELECT orders.id AS order_id, orders.date_placed, order_lines.quantity, items.name
FROM orders JOIN customers ON customers.id = orders.customer_id JOIN orders_lines ON orders.id = orders_lines.order_id JOIN items ON items.id = orders_lines.item_id
WHERE customers.first_name = :first_name_1 AND customers.last_name = :last_name_1
RMKeyView(['order_id', 'date_placed', 'quantity', 'name'])
[(1, datetime.datetime(2021, 4, 22, 10, 34, 39, 548608), 5, 'Chair')
(1, datetime.datetime(2021, 4, 22, 10, 34, 39, 548608), 2, 'Pen'),
(1, datetime.datetime(2021, 4, 22, 10, 34, 34, 39, 548608), 1, 'Headphone'),
(2, datetime.datetime(2021, 4, 22, 10, 34, 39, 548608), 5, 'Chair'),
(2, datetime.datetime(2021, 4, 22, 10, 34, 39, 548608), 5, 'Pen')]

And here is how to create an external union.

select([
customers.c.first_name,
orders.c.id,
]).select_from(
customers.outerjoin(orders)
)

The Table instance passed to the outerjoin() method is on the right side of the outerjoin. As a result, the last query will return all records from the customers table (left table) and only those that match the join condition from the orders table (right table). If you want all records from order table, but only those that match the condition from orders, you should use outerjoin():

select([
customers.c.first_name,
orders.c.id,
]).select_from(
orders.outerjoin(customers)
)

You can also create a FULL OUTER JOIN by passing full=True to the outerjoin() method. For example:

select([
customers.c.first_name,
orders.c.id,
]).select_from(
orders.outerjoin(customers, full=True)
)

Updating records

The data is updated using the update() function. For example, the following query updates selling_price and quantity for Water Bottle and sets the values to 30 and 60 respectively.

from sqlalchemy import update
s = update(items).where(
items.c.name == 'Water Bottle'
).values(
selling_price = 30,
quantity = 60,
)
print(s)
rs = conn.execute(s)

Output:

UPDATE items SET selling_price=:selling_price, quantity=:quantity WHERE items.name = :name_1

Deleting records

The delete() function is used to delete data.

from sqlalchemy import delete
s = delete(customers).where(
customers.c.username.like('Vladim%')
)
print(s)
rs = conn.execute(s)

Output:

DELETE FROM customers WHERE customers.username LIKE :username_1

This query will remove all customers whose username starts with Vladim.

Dealing with duplicates

The DISTINCT parameter is used to handle duplicate entries in the results. It can be added to SELECT with the distinct() method. For example:

# without DISTINCT
s = select([customers.c.town]).where(customers.c.id < 10)
print(s)
rs = conn.execute(s)
print(rs.fetchall())
# with DISTINCT
s = select([customers.c.town]).where(customers.c.id < 10).distinct()
print(s)
rs = conn.execute(s)
print(rs.fetchall())

Output:

SELECT customers.town
FROM customers
WHERE customers.id < :id_1
[(' Vladivostok',), ('Magadan',), (' Vladivostok',), (' Naberezhnye Chelny',), (' Ufa',), (' Irkutsk',)]
SELECT DISTINCT customers.town
FROM customers
WHERE customers.id < :id_1
[(' Vladivostok',), ('Ufa',), ('Irkutsk',), ('Magadan',), (' Naberezhnye Chelny',)]

Here’s another example of using distinct() with the aggregate function count().Here counts the number of unique cities in the customers table.

select([
func.count(distinct(customers.c.town))
func.count(customers.c.town)
])

Data conversion with cast Converting data from one type to another is a common operation that can be done using the sqlalchemy library’s cast() function.

from sqlalchemy import func, cast, Date
s = select([
cast(func.pi(), Integer),
cast(func.pi(), Numeric(10,2)),
cast("2010-12-01", DateTime),
cast("2010-12-01", Date),
])
print(s)
rs = conn.execute(s)
print(rs.fetchall())

Output:

SELECT CAST(pi() AS INTEGER) AS pi, CAST(pi() AS NUMERIC(10, 2)) AS anon__1, CAST(:param_1 AS DATETIME) AS anon_1, CAST(:param_2 AS DATE) AS anon_2
[(3, Decimal('3.14'), datetime.datetime(2010, 12, 1, 0, 0), datetime.date(2010, 12, 1)]

Union The UNION statement allows you to combine the results of multiple SELECTs. A union() call is used to add it to the select() function.

from sqlalchemy import union, desc
u = union(
select([items.c.id, items.c.name]).where(items.c.name.like("Wa%"))
select([items.c.id, items.c.name]).where(items.c.name.like("%e%")),
).order_by(desc("id"))
print(u)
rs = conn.execute(u)
print(rs.fetchall())

Output:

SELECT items.id, items.name
FROM items
WHERE items.name LIKE :name_1 UNION SELECT items.id, items.name
FROM items
WHERE items.name LIKE :name_2 ORDER BY id DESC
[(8, 'Water Bottle'), (7, 'Watch'), (5, 'Keyboard'), (4, 'Travel Bag'), (3, 'Headphone'), (2, 'Pen')]

By default union() removes all repeated entries from the result. You should use union_all() to save them.

from sqlalchemy import union_all, desc
union_all(
select([items.c.id, items.c.name]).where(items.c.name.like("Wa%")),
select([items.c.id, items.c.name]).where(items.c.name.like("%e%")),
).order_by(desc("id"))

Creating subqueries You can also retrieve data from multiple tables using subqueries. The following query returns the ID and name of the items sorted by Dmitriy Yatsenko in his first order:

s = select([items.c.id, items.c.name]).where(
items.c.id.in_(
select([order_lines.c.item_id]).select_from(customers.join(orders).join(order_lines)).where(
and_(
customers.c.first_name == 'Dmitriy',
customers.c.last_name == 'Yatsenko',
orders.c.id == 1
)
)
)
)
print(s)
rs = conn.execute(s)
print(rs.fetchall())

Output:

SELECT items.id, items.name
FROM items
WHERE items.id IN (SELECT order_lines.item_id
FROM customers JOIN orders ON customers.id = orders.customer_id JOIN orders_lines ON orders.id = orders_lines.order_id
WHERE customers.first_name = :first_name_1 AND customers.last_name = :last_name_1 AND orders.id = :id_1)
[(1, 'Chair'), (2, 'Pen'), (3, 'Headphone')]

The same query can also be written using unions:

select([items.c.id, items.c.name]).select_from(customers.join(orders).join(orders_lines).join(items)).where(
and_(
customers.c.first_name == 'Dmitriy',
customers.c.last_name == 'Yatsenko',
orders.c.id == 1
)
)

“Raw” queries SQLAlchemy provides the ability to execute raw SQL queries using the text() function. For example, the following SELECT instruction returns all orders with items for Dmitriy Yatsenko.

from sqlalchemy.sql import text
s = text(
"""
SELECT
orders.id as "Order ID", items.id, items.name
FROM
customers
INNER JOIN orders ON customers.id = orders.customer_id
INNER JOIN order_lines ON order_lines.order_id = orders.id
INNER JOIN items ON items.id= order_lines.item_id
where customers.first_name = :first_name and customers.last_name = :last_name
"""
)
print(s)
rs = conn.execute(s, first_name='Dmitriy', last_name='Yatsenko')
print(rs.fetchall())

Output:

SELECT
orders.id as "Order ID", items.id, items.name
FROM
customers
INNER JOIN orders ON customers.id = orders.customer_id
INNER JOIN order_lines ON order_lines.order_id = orders.id
INNER JOIN items ON items.id= order_lines.item_id
where customers.first_name = :first_name and customers.last_name = :last_name
[(1, 1, 'Chair'), (1, 2, 'Pen'), (1, 3, 'Headphone'), (2, 1, 'Chair'), (2, 2, 'Pen')

Note that the instruction includes a pair of related parameters: first_name and last_name. The values themselves are passed to the execute() method. The same function can be embedded in select(). For example:

select([items]).where(
text("items.name like 'Wa%'")
).order_by(text("items.id desc"))

You can also execute raw SQL simply by passing it directly into execute(). For example:

rs = conn.execute("select * from orders;")
rs.fetchall()

Transactions A transaction is a way to execute sets of SQL instructions so that either all or none of them are executed. If at least one of the instructions involved in a transaction fails, the database reverts to the state it was in before the transaction began. There are now two orders in the database. The following two actions are required to complete the order:

  1. Remove ordered items from items
  2. Update date_shipped column with date

Both actions must be performed as one to be sure that the data is correct. The Connection object provides the begin() method, which initiates the transaction and returns the corresponding Transaction object. The latter in turn provides rollback() and commit() methods to roll back to the previous state or save the current state. In the following list, the dispatch_order() method takes order_id as an argument and performs the above actions with a transaction.

from sqlalchemy import func, update
from sqlalchemy.exc import IntegrityError
def dispatch_order(order_id):
# check if order_id is correct
r = conn.execute(select([func.count("*")]).where(orders.c.id == order_id))
if not r.scalar():
raise ValueError("Invalid order_id: {}".format(order_id))
# take items in order
s = select([order_lines.c.item_id, order_lines.c.quantity]).where(
order_lines.c.order_id == order_id
)
rs = conn.execute(s)
ordered_items_list = rs.fetchall()
# start of the transaction
t = conn.begin()
try:
for i in ordered_items_list:
u = update(items).where(
items.c.id == i.item_id
).values(quantity = items.c.quantity - i.quantity)
rs = conn.execute(u)
u = update(orders).where(orders.c.id == order_id).values(date_shipped=datetime.now())
rs = conn.execute(u)
t.commit()
print("Transaction complete.")
except IntegrityError as e:
print(e)
t.rollback()
print("Transaction failed.")
dispatch_order(1)

The first order includes 5 chairs and 2 pens. Calling dispatch_order() with order_id 1 will return this result.

The transaction has completed.

Now items and order_lines should look like this: items и order_lines The next order has 5 chairs and 4 handles, but there is only 1 chair and 1 handle left in stock. Run dispatch_order(2) for the second order.

(psycopg2.errors.CheckViolation) ERROR: the new string with respect to "items" violates the "quantity_check" restriction-check
DETAIL: The error string contains (1, Chair, 9.21, 10.81, -4).
[SQL: UPDATE items SET quantity=(items.quantity - %(quantity_1)s) WHERE items.id = %(id_1)s]
[parameters: {'quantity_1': 5, 'id_1': 1}
(Background on this error at: http://sqlalche.me/e/14/gkpj)
Transaction failed.

The execution ended with an error, because there are not enough pens in stock. As a result, the database returned to the state before the start of the transaction.

Related Posts

LEAVE A COMMENT