documentation/query/pgwire/python.md
import HighlyAvailableReads from "../../partials/pgwire/_highly_available_reads.partial.mdx" import KnownLimitations from "../../partials/pgwire/_known_limitations.partial.mdx" import ConnectionIssues from "../../partials/pgwire/_connection_issues.partial.mdx" import QueryErrors from "../../partials/pgwire/_query_errors.partial.mdx" import TimestampConfusion from "../../partials/pgwire/_timestamp_confusion.partial.mdx"
QuestDB is tested with the following Python clients:
Other Python clients that are compatible with the PostgreSQL wire protocol should also work with QuestDB, but we do not test them. If you find a client that does not work, please open an issue.
QuestDB is a high-performance database. The PGWire protocol has many flavors, and some of them are not optimized for performance. We found psycopg2 to be the slowest of the three clients. Our recommendation is to use asyncpg or psycopg3 for the best performance when querying data.
:::tip
For data ingestion, we recommend using QuestDB's first-party clients with the InfluxDB Line Protocol (ILP) instead of PGWire. PGWire should primarily be used for querying data in QuestDB. QuestDB provides an official Python client for data ingestion using ILP.
:::
All Python PostgreSQL clients need similar connection parameters to connect to QuestDB:
localhost)8812)admin)quest)qdb)asyncpg is an asynchronous PostgreSQL client library designed for high performance. It uses Python's async/await syntax and is built specifically for asyncio.
pip install asyncpg
import asyncio
import os
import time
import asyncpg
async def connect_to_questdb():
conn = await asyncpg.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
database='qdb'
)
version = await conn.fetchval("SELECT version()")
print(f"Connected to QuestDB version: {version}")
await conn.close()
# Set the timezone to UTC
os.environ['TZ'] = 'UTC'
if hasattr(time, 'tzset'):
# tzset is only available on Unix-like systems
time.tzset()
asyncio.run(connect_to_questdb())
:::note
Note: The asyncpg client uses the system timezone by default. QuestDB always sends timestamp in UTC.
To set the timezone to UTC, you can set the TZ environment variable before running your script.
This is important for time-series data to ensure consistent timestamps.
See the Timestamp Handling chapter for additional context on how on how QuestDB handles timezones. :::
asyncpg provides several methods for fetching data:
fetch(): Returns all rows as a list of Record objectsfetchval(): Returns a single value (first column of first row)fetchrow(): Returns a single row as a Record objectcursor(): Returns an async cursor for streaming resultsimport asyncio
import asyncpg
from datetime import datetime, timedelta
import os
import time
async def query_with_asyncpg():
conn = await asyncpg.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
database='qdb'
)
# Fetch multiple rows
rows = await conn.fetch("""
SELECT *
FROM trades
WHERE timestamp >= $1
ORDER BY timestamp DESC LIMIT 10
""", datetime.now() - timedelta(days=1))
print(f"Fetched {len(rows)} rows")
for row in rows:
print(f"Timestamp: {row['timestamp']}, Symbol: {row['symbol']}, Price: {row['price']}")
# Fetch a single row
single_row = await conn.fetchrow("""
SELECT *
FROM trades LIMIT -1
""")
if single_row:
print(f"Latest trade: {single_row['symbol']} at {single_row['price']}")
# Fetch a single value
count = await conn.fetchval("SELECT count(*) FROM trades")
print(f"Total trades: {count}")
await conn.close()
# Set the timezone to UTC
os.environ['TZ'] = 'UTC'
if hasattr(time, 'tzset'):
time.tzset()
asyncio.run(query_with_asyncpg())
For large result sets, you can use a cursor to fetch results in batches:
import asyncio
import asyncpg
import os
import time
async def stream_with_cursor():
conn = await asyncpg.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
database='qdb'
)
async with conn.transaction():
# Execute a query that might return a large number of rows
cursor = await conn.cursor("""
SELECT *
FROM trades
ORDER BY timestamp
""")
batch_size = 100
total_processed = 0
while True:
batch = await cursor.fetch(batch_size)
# If no more rows, break the loop
if not batch:
break
total_processed += len(batch)
print(f"Processed {total_processed} rows so far...")
await conn.close()
print(f"Finished processing {total_processed} total rows")
# Set the timezone to UTC
os.environ['TZ'] = 'UTC'
if hasattr(time, 'tzset'):
time.tzset()
asyncio.run(stream_with_cursor())
For applications that need to execute many queries, you can use connection pooling:
import asyncio
import asyncpg
import os
import time
async def connection_pool_example():
pool = await asyncpg.create_pool(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
database='qdb',
min_size=5,
max_size=20
)
async with pool.acquire() as conn:
result = await conn.fetch("SELECT * FROM trades LIMIT 10")
print(f"Fetched {len(result)} rows")
await pool.close()
# Set the timezone to UTC
os.environ['TZ'] = 'UTC'
if hasattr(time, 'tzset'):
time.tzset()
asyncio.run(connection_pool_example())
asyncpg uses numbered parameters ($1, $2, etc.) for prepared statements:
import asyncio
import asyncpg
from datetime import datetime, timedelta
import os
import time
async def parameterized_query():
conn = await asyncpg.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
database='qdb'
)
end_time = datetime.now()
start_time = end_time - timedelta(days=7)
rows = await conn.fetch("""
SELECT
symbol,
avg(price) as avg_price,
min(price) as min_price,
max(price) as max_price
FROM trades
WHERE timestamp >= $1 AND timestamp <= $2
GROUP BY symbol
""", start_time, end_time)
print(f"Found {len(rows)} symbols")
for row in rows:
print(f"Symbol: {row['symbol']}, Avg Price: {row['avg_price']:.2f}")
await conn.close()
# Set the timezone to UTC
os.environ['TZ'] = 'UTC'
if hasattr(time, 'tzset'):
time.tzset()
asyncio.run(parameterized_query())
executemany()While we recommend using the InfluxDB Line Protocol (ILP) for ingestion, you can also use
the executemany() method to insert multiple rows in a single query. It is highly efficient for executing the same
parameterized statements multiple times with different sets of data. This method is significantly faster than executing
individual statements in a loop because it reduces network round-trips and allows for potential batching optimizations
by the database. In our testing, we found that executemany() can be 10x-100x faster than using a loop with execute().
It's particularly useful for bulk data insertion, such as recording multiple trades as they occur. INSERT performance
grows with the batch size, so you should experiment with the batch size to find the optimal value for your use case. We
recommend starting with a batch size of 1,000 and adjusting it based on further testing.
import asyncio
import os
import time
import asyncpg
from datetime import datetime, timedelta, timezone
async def execute_many_market_data_example():
conn = await asyncpg.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
database='qdb'
)
await conn.execute("""
CREATE TABLE IF NOT EXISTS trades (
timestamp TIMESTAMP,
symbol SYMBOL,
price DOUBLE,
volume LONG,
exchange SYMBOL
) timestamp(timestamp) PARTITION BY DAY;
""")
base_timestamp = datetime.now()
trades_data = [
( (base_timestamp + timedelta(microseconds=10)), 'BTC-USD', 68500.50, 0.5, 'Coinbase'),
( (base_timestamp + timedelta(microseconds=20)), 'ETH-USD', 3800.20, 2.1, 'Kraken'),
( (base_timestamp + timedelta(microseconds=30)), 'BTC-USD', 68501.75, 0.25, 'Binance'),
( (base_timestamp + timedelta(microseconds=40)), 'SOL-USD', 170.80, 10.5, 'Coinbase'),
( (base_timestamp + timedelta(microseconds=50)), 'ETH-USD', 3799.90, 1.5, 'Binance'),
]
await conn.executemany("""
INSERT INTO trades (timestamp, symbol, price, volume, exchange)
VALUES ($1, $2, $3, $4, $5)
""", trades_data)
print(f"Successfully inserted {len(trades_data)} trade records using executemany.")
# Set the timezone to UTC
os.environ['TZ'] = 'UTC'
if hasattr(time, 'tzset'):
time.tzset()
asyncio.run(execute_many_market_data_example())
QuestDB, via the PostgreSQL wire protocol, supports array data types, including multidimensional arrays. asyncpg makes working with these arrays straightforward by automatically converting Python lists (or lists of lists for multidimensional arrays) into the appropriate PostgreSQL array format and vice-versa when fetching data.
:::note Arrays are supported from QuestDB version 9.0.0. :::
:::caution asyncpg & QuestDB Arrays: Manual Setup Needed Using array types with asyncpg against QuestDB requires manual type registration.
asyncpg's standard type introspection query is not yet supported by QuestDB. Therefore, you must manually register a "codec" for your array types with the asyncpg connection, as shown in the code example below. This ensures correct array handling and avoids errors.
This is a temporary workaround until a permanent solution is available in asyncpg or QuestDB. :::
When you need to insert multiple rows containing array data, such as a series of order book snapshots, executemany()
offers a more performant way to do so compared to inserting row by row with execute().
import asyncio
import os
import time
import asyncpg
from datetime import datetime, timedelta
async def batch_insert_l3_order_book_arrays():
conn = await asyncpg.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
database='qdb'
)
# Workaround for asyncpg using introspection to determine array types.
# The introspection query uses a construct that is not supported by QuestDB.
# This is a temporary workaround before this PR or its equivalent is merged to asyncpg:
# https://github.com/MagicStack/asyncpg/pull/1260
arrays = [{
'oid': 1022,
'elemtype': 701,
'kind': 'b',
'name': '_float8',
'elemtype_name': 'float8',
'ns': 'pg_catalog',
'elemdelim': ',',
'depth': 0,
'range_subtype': None,
'attrtypoids': None,
'basetype': None
}]
conn._protocol.get_settings().register_data_types(arrays)
await conn.execute("""
CREATE TABLE IF NOT EXISTS l3_order_book
(
bid DOUBLE [][],
ask DOUBLE [][],
timestamp TIMESTAMP
) TIMESTAMP(timestamp) PARTITION BY DAY WAL;
""")
# Prepare a list of L3 order book snapshots for batch insertion
snapshots_to_insert = []
base_timestamp = datetime.now()
# First row
bids1 = [[68500.50, 0.5], [68500.00, 1.2], [68499.50, 0.3]]
asks1 = [[68501.00, 0.8], [68501.50, 0.4], [68502.00, 1.1]]
ts1 = (base_timestamp + timedelta(seconds=1))
snapshots_to_insert.append((bids1, asks1, ts1))
# Second row
bids2 = [[68502.10, 0.3], [68501.80, 0.9], [68501.20, 1.5]]
asks2 = [[68502.50, 1.1], [68503.00, 0.6], [68503.50, 0.2]]
ts2 = (base_timestamp + timedelta(seconds=2))
snapshots_to_insert.append((bids2, asks2, ts2))
# Third row
bids3 = [[68490.60, 2.5], [68489.00, 3.2]]
asks3 = [[68491.20, 1.8], [68492.80, 0.7]]
ts3 = (base_timestamp + timedelta(seconds=3))
snapshots_to_insert.append((bids3, asks3, ts3))
print(f"Prepared {len(snapshots_to_insert)} snapshots for batch insertion.")
# Insert the snapshots into the database in a single batch
await conn.executemany(
"""
INSERT INTO l3_order_book (bid, ask, timestamp)
VALUES ($1, $2, $3)
""",
snapshots_to_insert # List of tuples, each tuple is a row
)
print(f"Successfully inserted {len(snapshots_to_insert)} L3 order book snapshots using executemany().")
# Set the timezone to UTC
os.environ['TZ'] = 'UTC'
if hasattr(time, 'tzset'):
time.tzset()
asyncio.run(batch_insert_l3_order_book_arrays())
asyncpg uses the binary protocol by default, which improves performance by avoiding text encoding/decoding for data transfer.
psycopg3 is the latest major version of the popular psycopg PostgreSQL adapter. It's a complete rewrite of psycopg2 with support for both synchronous and asynchronous operations.
pip install psycopg[binary]
:::tip
Use psycopg[binary] to install the binary version of psycopg3, which is faster and more efficient. In our testing, we
found that the binary version of psycopg3 is significantly faster than the pure-Python version.
:::
We recommend always passing binary=True to the cursor() method to use the binary protocol for better performance.
This is especially important for large datasets and array types.
import psycopg
conn = psycopg.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb',
autocommit=True # Important for QuestDB
)
with conn.cursor(binary=True) as cur:
cur.execute("SELECT version()")
version = cur.fetchone()
print(f"Connected to QuestDB version: {version[0]}")
conn.close()
psycopg3 provides several methods for fetching data:
fetchall(): Returns all rows as a list of tuplesfetchone(): Returns a single row as a tuplefetchmany(size): Returns a specified number of rowsimport psycopg
from datetime import datetime, timedelta
with psycopg.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb',
autocommit=True
) as conn:
with conn.cursor(binary=True) as cur:
end_time = datetime.now()
start_time = end_time - timedelta(days=1)
cur.execute("""
SELECT *
FROM trades
WHERE timestamp >= %s
AND timestamp <= %s
ORDER BY timestamp DESC LIMIT 10
""", (start_time, end_time))
rows = cur.fetchall()
print(f"Fetched {len(rows)} rows")
for row in rows:
print(f"Timestamp: {row[0]}, Symbol: {row[1]}, Price: {row[2]}")
psycopg3 allows you to specify how rows are returned using row factories:
import psycopg
with psycopg.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb',
autocommit=True
) as conn:
with conn.cursor(row_factory=psycopg.rows.dict_row, binary=True) as cur:
cur.execute("SELECT * FROM trades LIMIT 5")
rows = cur.fetchall()
for row in rows:
print(f"Symbol: {row['symbol']}, Price: {row['price']}")
with conn.cursor(row_factory=psycopg.rows.namedtuple_row, binary=True) as cur:
cur.execute("SELECT * FROM trades LIMIT 5")
rows = cur.fetchall()
for row in rows:
print(f"Symbol: {row.symbol}, Price: {row.price}")
For large result sets, you can use server-side cursors:
import psycopg
with psycopg.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb',
autocommit=True
) as conn:
with conn.cursor(binary=True) as cur:
# Execute a query that might return many rows
cur.execute("SELECT * FROM trades")
batch_size = 1000
total_processed = 0
while True:
batch = cur.fetchmany(batch_size)
if not batch:
break
total_processed += len(batch)
if total_processed % 10000 == 0:
print(f"Processed {total_processed} rows so far...")
print(f"Finished processing {total_processed} total rows")
psycopg3 uses placeholder parameters (%s) for prepared statements:
import psycopg
from datetime import datetime, timedelta
with psycopg.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb',
autocommit=True
) as conn:
with conn.cursor(binary=True) as cur:
# Define query parameters
end_time = datetime.now()
start_time = end_time - timedelta(days=7)
# Execute a parameterized query
cur.execute("""
SELECT symbol,
avg(price) as avg_price,
min(price) as min_price,
max(price) as max_price
FROM trades
WHERE timestamp >= %s
AND timestamp <= %s
GROUP BY symbol
""", (start_time, end_time))
rows = cur.fetchall()
for row in rows:
print(f"Symbol: {row[0]}, Avg Price: {row[1]:.2f}")
psycopg3 supports async operations:
import asyncio
import psycopg
async def async_psycopg3():
async with await psycopg.AsyncConnection.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb',
autocommit=True
) as aconn:
async with aconn.cursor(binary=True) as acur:
await acur.execute("SELECT * FROM trades LIMIT 10")
rows = await acur.fetchall()
print(f"Fetched {len(rows)} rows")
for row in rows:
print(row)
asyncio.run(async_psycopg3())
QuestDB, via the PostgreSQL wire protocol, supports array data types, including multidimensional arrays.
When you need to insert multiple rows containing array data, such as a series of order book snapshots, executemany()
offers a more performant way to do so compared to inserting row by row with execute().
:::tip For data ingestion, we recommend using QuestDB's first-party clients with the InfluxDB Line Protocol (ILP) instead of PGWire. PGWire should primarily be used for querying data in QuestDB.
If you cannot use ILP for some reason, you should prefer asyncpg over psycopg3 for performance reasons. We found that asyncpg is significantly faster than psycopg3 when inserting batches of data including arrays. :::
import asyncio
import os
import time
from datetime import datetime, timedelta
import psycopg
async def batch_insert_l3_order_book_arrays():
conn_str = "host=127.0.0.1 port=8812 dbname=qdb user=admin password=quest"
async with await psycopg.AsyncConnection.connect(conn_str) as conn:
async with conn.cursor(binary=True) as cur:
await cur.execute("""
CREATE TABLE IF NOT EXISTS l3_order_book
(
bid DOUBLE [][],
ask DOUBLE [][],
timestamp TIMESTAMP
) TIMESTAMP(timestamp) PARTITION BY DAY WAL;
""")
print("Table 'l3_order_book' is ready.")
# Prepare a list of L3 order book snapshots for batch insertion
snapshots_to_insert = []
base_timestamp = datetime.now()
# First row
bids1 = [[68500.50, 0.5], [68500.00, 1.2], [68499.50, 0.3]]
asks1 = [[68501.00, 0.8], [68501.50, 0.4], [68502.00, 1.1]]
ts1 = (base_timestamp + timedelta(seconds=1))
snapshots_to_insert.append((bids1, asks1, ts1))
# Second row
bids2 = [[68502.10, 0.3], [68501.80, 0.9], [68501.20, 1.5]]
asks2 = [[68502.50, 1.1], [68503.00, 0.6], [68503.50, 0.2]]
ts2 = (base_timestamp + timedelta(seconds=2))
snapshots_to_insert.append((bids2, asks2, ts2))
# Third row
bids3 = [[68490.60, 2.5], [68489.00, 3.2]]
asks3 = [[68491.20, 1.8], [68492.80, 0.7]]
ts3 = (base_timestamp + timedelta(seconds=3))
snapshots_to_insert.append((bids3, asks3, ts3))
print(f"Prepared {len(snapshots_to_insert)} snapshots for batch insertion.")
# Insert the snapshots into the database in a single batch
await cur.executemany(
"""
INSERT INTO l3_order_book (bid, ask, timestamp)
VALUES (%b, %b, %b)
""",
snapshots_to_insert
)
print(f"Successfully inserted {cur.rowcount} L3 order book snapshots using executemany().")
if __name__ == "__main__":
# Set timezone to UTC
os.environ['TZ'] = 'UTC'
if hasattr(time, 'tzset'):
time.tzset()
asyncio.run(batch_insert_l3_order_book_arrays())
The example above uses %b as placeholder parameters instead of the usual %s placeholder. This is because
%b format forces the use of the binary protocol. By default, psycopg3 uses the text encoding for array
parameters. The binary protocol is more efficient for transferring large amounts of data, especially for
arrays. The binary protocol is also the default for asyncpg, which is why we recommend using asyncpg for
data ingestion over PGWire.
psycopg3 provides connection pooling capabilities. This reduces the overhead of establishing new connections
and allows for efficient reuse of existing connections. The feature requires the psycopg_pool package.
pip install psycopg_pool
from psycopg_pool import ConnectionPool
pool = ConnectionPool(
min_size=5,
max_size=20,
kwargs={
'host': '127.0.0.1',
'port': 8812,
'user': 'admin',
'password': 'quest',
'dbname': 'qdb',
'autocommit': True
}
)
with pool.connection() as conn:
with conn.cursor(binary=True) as cur:
cur.execute("SELECT * FROM trades LIMIT 10")
rows = cur.fetchall()
print(f"Fetched {len(rows)} rows")
pool.close()
psycopg2 is a mature PostgreSQL adapter for Python. While not as performant as asyncpg or psycopg3, it's widely used and has excellent compatibility. This driver is recommended as a last resort for QuestDB if asyncpg or psycopg3 are not working for you.
pip install psycopg2-binary
import psycopg2
conn = psycopg2.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb'
)
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT version()")
version = cur.fetchone()
print(f"Connected to QuestDB version: {version[0]}")
conn.close()
psycopg2 provides several methods for fetching data:
fetchall(): Returns all rows as a list of tuplesfetchone(): Returns a single row as a tuplefetchmany(size): Returns a specified number of rowsimport psycopg2
from datetime import datetime, timedelta
conn = psycopg2.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb'
)
conn.autocommit = True
try:
with conn.cursor() as cur:
cur.execute("SELECT * FROM trades LIMIT 10")
rows = cur.fetchall()
print(f"Fetched {len(rows)} rows")
for row in rows:
print(f"Timestamp: {row[0]}, Symbol: {row[1]}, Price: {row[2]}")
# Fetch one row at a time
cur.execute("SELECT * FROM trades LIMIT 5")
print("\nFetching one row at a time:")
row = cur.fetchone()
while row:
print(row)
row = cur.fetchone()
# Fetch many rows at a time
cur.execute("SELECT * FROM trades LIMIT 100")
print("\nFetching 10 rows at a time:")
while True:
rows = cur.fetchmany(10)
if not rows:
break
print(f"Batch of {len(rows)} rows")
finally:
conn.close()
psycopg2 provides dictionary cursors to access rows by column name:
import psycopg2.extras
conn = psycopg2.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb'
)
conn.autocommit = True
try:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute("SELECT * FROM trades LIMIT 5")
rows = cur.fetchall()
for row in rows:
print(f"Symbol: {row['symbol']}, Price: {row['price']}")
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT * FROM trades LIMIT 5")
rows = cur.fetchall()
for row in rows:
print(row) # Prints as a dict
finally:
conn.close()
For large result sets, you can use server-side cursors:
import psycopg2
conn = psycopg2.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb'
)
conn.autocommit = True
try:
with conn.cursor() as cur:
# Execute a query that might return many rows
cur.execute("SELECT * FROM trades")
batch_size = 1000
total_processed = 0
while True:
batch = cur.fetchmany(batch_size)
# If no more rows, break the loop
if not batch:
break
total_processed += len(batch)
if total_processed % 10000 == 0:
print(f"Processed {total_processed} rows so far...")
print(f"Finished processing {total_processed} total rows")
finally:
conn.close()
psycopg2 uses placeholder parameters (%s) for prepared statements:
import psycopg2
from datetime import datetime, timedelta
conn = psycopg2.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb'
)
conn.autocommit = True
try:
with conn.cursor() as cur:
end_time = datetime.now()
start_time = end_time - timedelta(days=7000)
cur.execute("""
SELECT symbol,
avg(price) as avg_price,
min(price) as min_price,
max(price) as max_price
FROM trades
WHERE timestamp >= %s
AND timestamp <= %s
GROUP BY symbol
""", (start_time, end_time))
rows = cur.fetchall()
for row in rows:
print(f"Symbol: {row[0]}, Avg Price: {row[1]:.2f}")
finally:
conn.close()
For connection pooling with psycopg2, you can use external libraries like psycopg2-pool:
from psycopg2.pool import ThreadedConnectionPool
pool = ThreadedConnectionPool(
minconn=5,
maxconn=20,
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb'
)
conn = pool.getconn()
try:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT * FROM trades LIMIT 10")
rows = cur.fetchall()
print(f"Fetched {len(rows)} rows")
finally:
pool.putconn(conn)
pool.closeall()
psycopg2 integrates with pandas over SQLAlchemy, allowing you to read data directly into a DataFrame. This feature
requires the pandas, sqlalchemy, and questdb-connect packages.
pip install pandas sqlalchemy questdb-connect
:::note
This example shows how to use the SQLAlchemy engine with psycopg2 for querying QuestDB into a pandas DataFrame. For ingestion from Pandas to QuestDB see our pandas ingestion guide.
:::
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime, timedelta
engine = create_engine("questdb://admin:quest@localhost:8812/qdb")
with engine.connect() as conn:
end_time = datetime.now()
start_time = end_time - timedelta(days=10000)
query = """
SELECT * \
FROM trades
WHERE timestamp >= %s \
AND timestamp <= %s
ORDER BY timestamp \
"""
# Execute the query directly into a pandas DataFrame
df = pd.read_sql(query, conn, params=(start_time, end_time))
print(f"DataFrame shape: {df.shape}")
print(f"DataFrame columns: {df.columns.tolist()}")
print(f"Sample data:\n{df.head()}")
For optimal query performance with QuestDB:
QuestDB provides specialized time-series functions that work with all PGWire clients:
SAMPLE BY is used for time-based downsampling:
SELECT timestamp,
symbol,
avg(price) as avg_price,
min(price) as min_price,
max(price) as max_price
FROM trades
WHERE timestamp >= dateadd('d', -7, now()) SAMPLE BY 1h;
LATEST ON is an efficient way to get the most recent values:
SELECT *
FROM trades
WHERE timestamp IN today()
LATEST ON timestamp PARTITION BY symbol;
QuestDB's support for the PostgreSQL Wire Protocol allows you to use a variety of Python clients to query your time-series data:
For most use cases, we recommend using asyncpg or psycopg3 for better performance. For data ingestion, consider using QuestDB's first-party clients with the InfluxDB Line Protocol (ILP) for maximum throughput.