Python SDK对接向量数据库

本文介绍如何使用Python语言调用openGauss向量数据库

环境准备

限制:python3.11及以上

  • 在线安装
pip3 install psycopg2
  • 离线安装

开发者可以直接到pypi官网下载psycopg2并安装。

pip3 install YourPath/psycopg2.whl

基本操作

1.连接数据库

import psycopg2
from psycopg2 import sql
import numpy as np
from typing import List

def create_connection(dbname:str, user:str, password:str, host:str, port:int):
    conn = psycopg2.connect(
        dbname = dbname,
        user = user,
        password = password,
        host = host,
        port = port
    )
    cursor = conn.cursor()
    return conn, cursor

2.创建表

def create_table(conn, cursor, table_name:str, dim:int):
    cursor.execute(
        sql.SQL(
            "CREATE TABLE IF NOT EXISTS public.{table_name} (id BIGINT PRIMARY KEY, embedding vector({dim}));"
        ).format(table_name = sql.Identifier(table_name), dim = sql.Literal(dim))
    )
    conn.commit()

3.创建索引

def create_index(conn, cursor, table_name:str, index_name:str):
    cursor.execute(
        sql.SQL(
            """
            CREATE INDEX IF NOT EXISTS {index_name} ON public.{table_name}
            USING hnsw (embedding vector_l2_ops);
            """
        ).format(index_name = sql.Identifier(index_name), table_name = sql.Identifier(table_name))
    )
    conn.commit()

4.插入/删除/更新数据

  • 批量插入
def insert(conn, cursor, table_name:str, embeddings:List[List[float]], ids:List[int]):
    data = list(zip(ids, embeddings))
    cursor.executemany(
        sql.SQL("INSERT INTO public.{table_name} (id, embedding) VALUES(%s, %s);")
        .format(table_name = sql.Identifier(table_name)), data
    )
    conn.commit()
  • 删除
def delete(conn, cursor, table_name:str, ids:List[int]):
    cursor.execute(
        sql.SQL(
            "DELETE FROM public.{table_name} WHERE id IN ({ids});"
        ).format(table_name = sql.Identifier(table_name), ids = sql.SQL(',').join(map(sql.Literal, ids)))
    )
    delete_count = cursor.rowcount
    conn.commit()
    return delete_count
  • 更新
def update(conn, cursor, table_name:str, id:int, vector:List[List[float]]):
    cursor.execute(
        sql.SQL(
            "UPDATE public.{table_name} SET embedding = %s WHERE id = %s;"
        ).format(table_name = sql.Identifier(table_name)), (vector ,id)
    )
    conn.commit()

5.查询

# 串行批量查询
def select(conn, cursor, table_name:str, queries:List[List[float]], topk:int):
    ids = []
    for emb in queries:
        cursor.execute(
            sql.SQL(
                "SELECT * FROM public.{table_name} ORDER BY embedding <-> %s::vector LIMIT %s::int;"
            ).format(table_name = sql.Identifier(table_name)), (emb, topk)
        )
        conn.commit()
        result = cursor.fetchall()
        ids.append([int(i[0]) for i in result])
    return ids

6.删除表

def drop_table(conn, cursor, table_name:str):
    cursor.execute(
        sql.SQL(
            "DROP TABLE IF EXISTS public.{table_name};"
        ).format(table_name = sql.Identifier(table_name))
    )
    conn.commit()

7.关闭连接

def close_connection(conn, cursor):
    conn.close()
    cursor.close()

用例

conn, cursor = create_connection("testdb", "test_user", YourPassword, "localhost", 5432)
create_table(conn, cursor, "test_table1", 3)
create_index(conn, cursor, "test_table1", "idx_test1")
insert(conn, cursor, "test_table1", [[1.2, 3, 5], [4.3, 5.2, 1]], [0, 1])
delete(conn, cursor, "test_table1", [0])
update(conn, cursor, "test_table1", 1, [1, 3, 3])
select(conn, cursor, "test_table1", [[1, 2, 2], [3, 5, 1]], 2)
drop_table(conn, cursor, "test_table1")
close_connection(conn, cursor)
意见反馈
编组 3备份
    openGauss 2025-04-26 22:53:55
    取消