PostgreSQL、FastAPI、Docker を使用してバックエンドを構築する (2023)

マップベースのアプリケーションを開発するためのステップバイステップ ガイド (パート IV)

PostgreSQL、FastAPI、Docker を使用してバックエンドを構築する (1)

に発表されました

データサイエンスに向けて

·

28 分で読めます

·

3月14日

--

PostgreSQL、FastAPI、Docker を使用してバックエンドを構築する (3)

マップは地理データを視覚化して理解するための強力なツールですが、効率的に設計するには特別なスキルが必要です。

このステップバイステップ ガイドでは、顧客に周囲のガソリン スタンドの価格を表示する地図ベースのアプリケーションの構築について詳しく説明します。オリジナルの概念実証 (POC) から実用最小限の製品 (MVP) まで、製品のさまざまな重要なステップをカバーします。

パート I: 概念実証 — 最小限のデモを構築する

パート II: React を使用して Web アプリを構築する方法 (静的レイアウト)

パート III: React を使用して Web アプリにインタラクティブ性を追加する

パート IV: PostgreSQL、FastAPI、Docker を使用してバックエンドを構築する

この記事に関するちょっとした背景

このシリーズの前の記事では、以下を使用してガソリン スタンド ファインダーのフロントエンドを構築しました。反応するそしてバックエンドは関連データのみを提供する「ブラックボックス」であると考えました。

このパートでは、次のような強力なツールを使用してバックエンドを構築する方法を段階的に詳しく説明します。PostgreSQLまたファストAPI

このプロジェクトの完全なコードは、私のファイルにあります。Githubページ

なぜクリーンなバックエンドが必要なのでしょうか?

このシリーズの最初の部分では、公共プロバイダーからガソリン スタンドから直接データをオンザフライで取得するためのユーティリティ関数をいくつか作成しました。概念実証にはこれで十分でしたが、次のような理由により、より堅牢なシステムが必要になりました。

  • パフォーマンスと遅延: XML の解析、書式設定、フィルタリングなどのデータのリアルタイム処理は計算コストが高く、頻繁に使用することが予想されるアプリケーションには現実的ではありません。
  • 信頼性: アプリケーションがサードパーティ データ ソースの予期せぬ変更やダウンタイムの影響を受けないようにするため。外部ポータルからのデータのみに依存すると、プロバイダーによるフィールド名の単純な変更でも、変更にパッチを適用する間にバグやダウンタイムが発生する可能性があるため、アプリケーションが危険にさらされることになります。独自のデータベースを構築することで、データをより適切に制御できるようになり、外部関係者に依存せずに必要な更新やメンテナンスを実行できます。
  • カスタマイズ: 独自のデータベースを使用すると、技術仕様に合わせてデータをカスタマイズしたり、他の外部データ ソースを追加したり、さまざまなユースケースに合わせてデータのカスタム ビューを構築したりできます。

これらの要件に対処するために、データの取得、処理、フロントエンドへの配信を処理できる独自のデータベースと API を構築します。これには、PostgreSQL データベースを実行することが含まれます。ドッカー、Pを使用してまだそして、ラルケミーデータベースと対話し、ポストGIS拡大。また、次を使用して単純な API を構築する方法も検討します。ファストAPISQLモデル

以下のグラフは、アプリのさまざまなコンポーネントの単純なスキーマです。

PostgreSQL、FastAPI、Docker を使用してバックエンドを構築する (4)

この記事の内容

この記事では、内部データベースと API の作成に焦点を当てます。特に、次のことを行います。

  • Docker を使用して PostgreSQL データベースを実行する
  • Python と sqlalchemy を使用してデータベースと対話する
  • PostGIS 拡張機能を使用して地理クエリを作成する
  • FastAPI と SQLmodel を使用してシンプルな API を構築する
  • プロジェクトをコンテナ化して実行するドッカー構成

Docker は、一貫した分離された環境でアプリケーションを実行できるオープンソースのコンテナ化プラットフォームです。 Docker を使用して PostgreSQL サーバーをセットアップすることには、システム上の他の構成と競合する危険を冒さずに、標準化された方法でアプリケーションをインストールできるなど、いくつかの利点があります。

この例では、Postgres サーバーをコンテナ内に直接セットアップします。

インストール方法はシステムによって異なるため、ここではコンピューターに既に Docker がインストールされていることを前提とします。

コンテナイメージを取得する

Docker イメージは、特定のタスク専用のコンテナーを構築するために必要なものすべての仕様とみなすことができます。それ自体では何も行いませんが、アプリケーションが存在するコンテナー (専用の仮想環境) を構築するために使用されます。 Dockerfile を使用して独自のカスタム イメージを作成することも (これについては後で説明します)、コミュニティで共有されているさまざまなオープンソース イメージから既製のイメージをダウンロードすることもできます。

私たちの場合、PostgreSQL を実行するコンテナーを作成するのに役立つイメージが必要です。公式画像その目的のために。

まず、Docker に PostgreSQL イメージをダウンロードします。これはシェルで行われます。

docker pull postgres

Postgreコンテナを実行する

イメージが Docker にダウンロードされたら、次のコマンドを使用して、それに基づいてコンテナーを構築できます。

docker run -itd -e POSTGRES_USER=jkaub -e POSTGRES_PASSWORD=jkaub -p 5432:5432 -v ~/db:/var/lib/postgresql/data --name station-db postgres

解読しましょう。

-itd は 3 つのパラメータの組み合わせです。

  • -d は、コンテナーをデタッチ モードで実行することを意味します。このモードでは、コンテナーがバックグラウンドで実行され、ターミナルを他の目的で引き続き使用できます。
  • -i は、コンテナーが対話モードで実行されることを指定します。これにより、コンテナ内に入り、コンテナと対話できるようになります。
  • -t は、コンテナと対話するために疑似ターミナルがコンテナ内で利用可能になることを意味します。これにより、コンテナとのよりシームレスで直感的な対話が実現します。

-e コンテナ内に環境変数を生成します。この場合、環境変数 POSTGRES_USER および POSTGRES_PASSWORD は、指定されたパスワードを持つ PostgreSQL インスタンスの新しいユーザーを生成するためにも使用されます。これを行わなくても、デフォルトのユーザー/パスワード (postgre/postgre) を使用して PostgreSQL インスタンスにアクセスできます。

-p は、ローカル マシンから Docker コンテナにポートをマップするために使用されます。 PostgreSQL に使用されるデフォルトのポートは 5432 です。ローカル マシンですでに使用されている場合は、このパラメータを使用して、コンテナからマシンの別のポートに 5432 をマッピングできます。

-v は、この場合非常に重要なパラメータです。これにより、マシン (この場合はフォルダ) からボリュームをマッピングできるようになります。~/db) SQL データがデフォルトで保存されるコンテナー内のボリューム (/var/lib/postgresql/data)。このマッピングを行うことで、コンテナーが停止した後も残る永続ボリュームが作成されます。したがって、コンテナーの使用を停止してもデータベースは保持され、後で使用できるようになります。

— name はコンテナに名前を付けるための単なるフラグであり、後でコンテナにアクセスするときに役立ちます。

マシン上で実行中のコンテナのリストを表示する以下のコマンドを使用して、コンテナがアクティブであることを確認できます。

ドッカーps

戻る:

コンテナ ID イメージ コマンド作成ステータス ポート名
cb0840806636 postgres "docker-entrypoint.s…" 2 分前 2 分前まで 0.0.0.0:5432->5432/tcp station-db

PostgreSQL インスタンスがコンテナ内で実行されるようになり、それと対話できるようになりました。

データベースを作成する

開始点として、プロジェクトのさまざまなテーブルを含む最初のデータベースを作成しましょう。

そのためには、コンテナに入る必要があります。コンテナの初期化時に -it パラメータを指定したため、これが可能であることに注意してください。以下のコマンドラインがその仕事を行います。

docker exec -it station-db bash

コマンド プロンプトは次のようになります。

root@cb0840806636:/#

つまり、コンテナ内では root ユーザーとしてログオンされます。次のように、ユーザー (-U) とパスワード (-d) を使用して PostgreSQL に接続できます。

psql -U jkaub -d jkaub

PostgreSQL インスタンスに入ると、SQL クエリを使用してインスタンスと対話でき、特に、将来のテーブルをホストする新しいデータベースを作成できます。

CREATE DATABASE ステーション。

データベースが作成されたことを確認するには、次のコマンドを実行します。

\l

これにより、システム内のさまざまなデータベースが表示されます。インスタンスの初期化時に作成されるいくつかのデフォルト データベースの中から、作成したばかりのデータベースを見つけることができます。

jkaub=# \l
データベース一覧
名前 |オーナー |エンコーディング |丁合 | Cタイプ | ICU ロケール |ロケールプロバイダー |アクセス権限
-----------+----------+----------+---------------+------ -------+-----------+---------------------+--------------- -------
駅 |かうぶ | UTF8 | en_US.utf8 | en_US.utf8 | |ライブ |

PostgreSQL インスタンスを設定したので、SQL クエリを手動で記述してデータベースと対話できるようになります。psqlテーブルを作成し、.csv ファイルからデータをインポートします。このアプローチは 1 回限りの使用には適していますが、テーブルを頻繁に更新する必要がある場合、長期的には面倒でエラーが発生しやすくなります。

したがって、自動化を容易にするために、Python フレームワークを使用してデータベースとそのテーブルを操作します。これにより、コードを使用してデータベースを簡単に作成、更新、クエリできるようになり、プロセスがより効率的になり、エラーが発生しにくくなります。

sqlalchemy でセッションを開く

SQLalchemy は、Python 開発者向けのオープンソース SQL ツールキットおよびオブジェクト リレーショナル マッパー (ORM) です。 SQL クエリを作成するのではなく、データベースと対話するための一連の高レベル関数を提案します。

これは、Python クラス (ここでは「モデル」とも呼ばれます) を使用してテーブルの構造を定義し、オブジェクト指向パラダイムで作業できるため、特に便利です。私たちのPython ORM、錬金術これは、次のパートでバックエンド API を構築するときに特に役立ちます。

プロジェクトに必要なライブラリをインストールすることから始めましょう。 sqlalchemy に加えて、次も使用します。サイココップ2これは Python 用の PostgreSQL アダプターであり、sqlalchemy によってコネクタとして使用できます。

pip install psycopg2 sqlalchemy

これで、Python で直接データベースへのセッションを効果的に作成できるようになりました。

sqlalchemyインポートcreate_engineから

Engine = create_engine('postgresql://jkaub:jkaub@localhost/stations')

# 簡単なクエリを実行して接続をテストします
Engine.connect() を conn として使用:
結果 = conn.execute('SELECT 1')
print(result.fetchone())

このスクリプトをステップごとに説明します。

Engine = create_engine('postgresql://jkaub:jkaub@localhost/stations')

create_engine メソッドは、データベースへの接続を保持するために使用されます。ここでは、データベースへの接続に必要なすべての情報を含むデータベース URL を指定する必要があります。

  • その URL postgresql の最初の部分://これは、PostgreSQL 接続を使用していることと、その後にそのタイプのデータベースの接続の仕様が続くことを指定するためのものです。 SQLite などの別のデータベースを使用している場合は、ベースも仕様も異なります。
  • ジェイカウブ: ジェイカウブ データベースに接続するためのログイン情報です。
  • ローカルホスト はデータベースを実行するサーバーです。サーバー IP を使用してリモート サーバーに接続することもできます。また、後でコンテナーのクラスターの場合に説明するように、場合によってはコンテナー名を使用することもできます。
  • /stations は、接続するデータベースを指定するために使用されます。私たちの場合は、作成したばかりの「ステーション」に接続します。
# 簡単なクエリを実行して接続をテストします
Engine.connect() を conn として使用:
結果 = conn.execute('SELECT 1')
print(result.fetchone())

コードのこの部分は、現時点では接続が正常に機能することをテストするためにのみ使用されます。データベースにはクエリするテーブルがまだないため、ダミー クエリを実行しているだけです。 (1,) が返されるはずです。これは、接続が成功したことを意味します。

PostgreSQL データベースを Docker コンテナ内にセットアップし、SQLAlchemy エンジンを使用してアクセスしたので、次はデータベースと対話するための API を開発します。

ここで API を使用すると、次のような利点があります。

  • 再利用性とプラットフォーム/言語に依存しない機能が提供され、複数のサービスが同じ API エンドポイントを使用できるようになります。
  • これにより、データベース ロジックがアプリケーション ロジックから分離され、入力/出力が尊重される限り、他方に影響を与えることなく一方を変更することが容易になります。
  • 認証システムを使用してデータベースにアクセスできるユーザーを制御できるため、セキュリティ層が追加されます。
  • 最後に、API はスケーラブルであり、複数のサーバー上で実行できるため、ワークロードを柔軟に管理できます。明確に定義された URL のセットを作成すると、API を介してデータベースからデータを取得、変更、挿入、または削除できるようになります。

FastAPIについて

ファストAPIによって開発された軽量 API の構築に特に効率的な最新の Python フレームワークです。セバスチャン・ラミレス

と組み合わせると特に効果的です錬金術卑劣な、データ検証に使用される Python ライブラリ (たとえば、日付が実際には日付であること、数値が数値であることなどを制御できます)。一緒に使用すると、フレームワークを介して直接テーブルを効果的に処理し、クエリを実行できるようになります。

さらに良いことに、セバスチャン・ラミレス別のライブラリも設計しました、SQLモデルこれは、pydantic と sqlalchemy の両方を組み合わせて、冗長性の一部を削除し、API のアーキテクチャをさらに簡素化します。

FastAPI にまだ慣れていない場合は、以下を参照することをお勧めします。このチュートリアルまず、これは本当によくできています。

プロジェクトを開始する前に、複数のライブラリをインストールする必要があります。

pip インストール uvicorn
pip インストール fastapi
pip インストール SQLmodel
pip インストール geoalchemy2
  • ユビコーンAPI サーバーを実行するツールであり、FastAPI と連携して動作するのに適しています。
  • ファスタピAPI のコア エンジンであり、さまざまなエンドポイントを作成するために使用します。
  • SQLモデルsqlalchemy ORM と pydantic の型検証機能を組み合わせます。
  • 地質化学2ジオクエリを実行するために使用される sqlalchemy の拡張機能です

モデルを初期化する

API プロジェクトの新しいリポジトリを作成しましょう。まず、モデルの定義から始めます。SQLモデル。 「モデル」とは、SQL でテーブルを表す Python クラスにすぎません。

API/
|-- アプリ/
|-- __init__.py
|-- モデル.py

私たちのプロジェクトには 3 つのテーブルがあり、構築した初期設計に従います。パート I

  • 都市に関連する情報 (郵便番号、場所) を含む 1 つのテーブル
  • ガス価格に関する情報を含む 1 つのテーブル
  • ステーションに関する情報を含む 1 つのテーブル

これらのテーブルを結合や地理フィルターと組み合わせると、フロントエンド側で要求された最終出力を構築するのに役立ちます。

最初のテーブルである Cities テーブルを見てみましょう。

from sqlmodel import フィールド、SQLModel
日時インポート日時から
import の入力から オプション

クラス都市(SQLModel、テーブル=True):
id: Optional[int] = フィールド (デフォルト = なし、primary_key = True)
郵便番号: str
名前: ストラ
緯度: 浮動小数点
ロン: 浮動小数点

クラス「Cities」はクラス SQLModel を継承し、両方の sqlalchemy を組み合わせています。ORM 機能と pydantic の入力制御。

パラメータ table=True は、対応するテーブルがデータベースにまだ存在しない場合に、列名と列の型を一致させて自動的に作成することを示します。

クラスの各属性は、各列をその型で定義します。特に、「id」が主キーになります。 Optional を使用すると、ID を指定しない場合、sqlalchemy が増分によって自動的に ID を生成することが示されます。

他の 2 つのテーブルのモデルも提供します。

日時インポート日時から

...

クラス GasPrices(SQLModel, table=True):
id: Optional[int] = フィールド (デフォルト = なし、primary_key = True)
ステーションID: str
オイル ID: str
名前: ストラ
値: 浮動小数点
maj: datetime = フィールド(default_factory=datetime.utcnow)

クラスステーション(SQLModel、テーブル=True):
station_id: str = フィールド(primary_key=True)
緯度: 浮動小数点
経度: 浮動小数点
CP: str
都市: str
アドレス: str

Stations テーブルの場合、主キーとして station_id を使用しており、GasPrices とは異なり、このフィールドは必須であることに注意してください。テーブルに送信するときにフィールドが空の場合、エラー メッセージが生成されます。

エンジンを初期化する

プロジェクトの構造を維持するための別の専用ファイルで、エンジンを開始します。そのファイルを services.py と呼びます。

API/
|-- アプリ/
|-- __init__.py
|-- モデル.py
|-- services.py

DB への接続は、前に示したものと同じ方法で行われます。

from sqlmodel import SQLModel、create_engine
モデルをインポートする

DATABASE_URL = 'postgresql://jkaub:jkaub@localhost/stations'

エンジン = create_engine(DATABASE_URL)

def create_db_and_tables():
SQLModel.metadata.create_all(エンジン)

create_db_and_tables() 関数に注意してください。この関数は API の初期化中に呼び出され、models.py で定義されたモデルを検索し、モデルが存在しない場合は SQL データベース内に直接作成します。

API を実践してみる

これで、エンドポイント (= データベースとの対話を可能にする URL) を配置するメインコンポーネントの開発を開始できます。

API/
|-- アプリ/
|-- __init__.py
|-- main.py
|-- モデル.py
|-- services.py

最初に行うことは、FastAPI の起動時に構成し、API 認証を処理することです。

fastapi.middleware.cors から CORSMiddleware をインポート
fastapi からのインポート FastAPI、HTTPException
モデルから都市、駅、ガソリン価格をインポート
サービスインポートエンジンから、create_db_and_tables

#FastAPIのインスタンスを作成します
app = FastAPI()

#ミドルウェアコンポーネントの認可を定義します
app.add_middleware(
CORSミドルウェア、
allow_origins=["http://localhost:3000"],
allow_credentials=True、
allow_methods=["*"]、
allow_headers=["*"]、
)

#テーブルがまだ存在しない場合はコールバックを使用してテーブルの作成をトリガーします
#API起動時
@app.on_event("起動")
def on_startup():
create_db_and_tables()

注意すべき重要な点: デフォルトでは、フロントには API 呼び出しを行うためのアクセス権がありません。ミドルウェア部分の設定を忘れると、フロントエンド側でエラーが発生する可能性があります。以下を使用して、すべてのオリジンを許可することを決定できます。

allow_origins=["*"]、

ただし、基本的に API はオンラインになったら世界に公開することになるため、これは安全上の理由からお勧めできません。フロントエンドは現在 localhost:3000 でローカルに実行されているため、これが許可されるドメインです。

この時点で、次のコマンド ラインを使用して API を起動できます。

uvicorn main:app --reload

「リロード」は、API の実行中に変更を保存するたびに、それらの変更を含めてリロードすることを単に意味します。

開始すると、シェルにいくつかのログが表示されます。特に、次のようなログが表示されます。

情報: http://127.0.0.1:8000 で実行中の Uvicorn (終了するには CTRL+C を押してください)

API サーバーがローカルホスト (IP 127.0.0.1 に相当)、ポート 8000 で実行されていることを示します。

前に説明したように、API を開始すると、DB 内に空のテーブルが作成されます (まだ存在しない場合)。したがって、初めて API を開始した瞬間から、table=True を指定して作成したモデルはデータベース内に専用のテーブルを持つことになります。

これは、psql の PostgresSQL コンテナ内から簡単に確認できます。メイン ユーザーとして接続したら、まずデータベース ステーションに接続します。

\c 駅

これで、テーブルが適切に作成されたことを確認できます。

\dt

返されるもの:

関係一覧
スキーマ |名前 |タイプ |オーナー
--------+---------------------+----------+----------
パブリック |都市 |テーブル |イカウブ
パブリック |ガソリン価格 |テーブル |イカウブ
パブリック |駅 |テーブル |イカウブ

また、psql で説明クエリを実行して、列がモデルと一致していることを確認することもできます (たとえば、都市に対して)。

\d 都市
コラム |タイプ |照合 | Null 可能 |デフォルト

-------------+-------------------+----------+---- ----------+----------------------
-------------
ID |整数 | | null ではない | nextval('cities_id_seq
'::regclass)
郵便番号 |文字の変化 | | null ではない |
名前 |文字の変化 | | null ではない |
緯度 |倍精度 | | null ではない |
ロン |倍精度 | | null ではない |
インデックス:
「cities_pkey」主キー、btree (id)

最初のリクエストの構築 — POST リクエストで Cities に行を追加する

Cities テーブルは 1 回だけ入力され、郵便番号と都市の緯度/経度を照合するために使用されます。これは、後で郵便番号を使用してそれらの場所をクエリする場合に特に役立ちます。

現時点では、データは .csv に保存されており、テーブルがまだデータベースにない場合は、一度に 1 行ずつ追加してテーブルを更新するために使用される POST 呼び出しを設計したいと考えています。 API 呼び出しは main.py ファイル内に置かれます。

SQLmodelインポートセッションから

...

@app.post("/add-city/")
def add_city(都市: 都市):
Session(engine) をセッションとして使用:
session.add(都市)
session.commit()
session.refresh(都市)
帰国都市

このコードを 1 行ずつ見てみましょう。

@app.post("/add-city/")

各 API エンドポイントはデコレーターを使用して定義されます。ここでは、リクエストのタイプ (get、post、put、delete…) と関連付けられた URL エンドポイント (/add-city/) の 2 つを定義しています。

この特定のケースでは、http://127.0.0.1:8000/add-city/ で POST リクエストを実行できます。

def add_city(都市: 都市):

クエリで使用するさまざまなパラメーターを関数に渡します。この場合、投稿リクエストは Cities のインスタンスを探します。これはリクエスト内の JSON 経由で渡されます。この JSON には、追加する新しい行の Cities テーブルの各列の値が含まれます。

Session(engine) をセッションとして使用:

データベースに接続するには、セッションを開きます。各クエリには独自のセッションが必要です。この方法を使用すると、セッション内で予期しないことが発生した場合に特に役立ちます。問題が発生した場合、セッションの初期化と commit() の間に行われたすべての変更がロールバックされます。

session.add(都市)
session.commit()
session.refresh(都市)

ここでは、オブジェクトがデータベースに追加されてからコミットされます。コミットされた瞬間から、操作をロールバックすることはできません。リフレッシュは、DB によって操作された変更を反映して「city」オブジェクトを更新するために使用されます。たとえば、この例では、増分「id」が自動的に追加されます。

帰国都市

オブジェクトの都市を JSON 形式で送信してリクエストを終了します。

これで、Python でリクエストを試すことができます (もちろん、これは API を実行している状態で行う必要があります)。

インポートリクエスト

url='http://127.0.0.1:8000/add-city/'

json = {
'郵便番号': '01400',
'name':"アベルジュモン クレマンシア",
「緯度」:46.1517、
'ロン':4.9306
}

req = request.post(url, json=json)

リクエストで送信している JSON のキーが、更新するテーブルの列の名前と一致していることに注意してください。パラメータ「id」はオプションであり、操作に自動的に追加されるため、特に気にする必要はありません。

これにより、API シェルで次の行がトリガーされます。

情報: 127.0.0.1:33960 - "POST /add-gas-price/ HTTP/1.1" 200 OK

リクエストが成功したことを意味します。その後、行が適切に追加されたことを確認できます。 Docker の psql に戻り、次のクエリを試してみましょう。

SELECT * 都市から LIMIT 1;

次のように表示されます:

ID |郵便番号 |名前 |緯度 |ロン
----+-------------+--------------------------+----- ----+--------
1 | 01400 |ラベルジュモン クレマンシア | 46.1517 | 4.9306

これは、行が API によって DB に効果的に追加されたことを示しています。

さらに、郵便番号を 2 回追加することは望ましくありません。これを行うには、Cities テーブルをクエリし、送信しようとしている郵便番号に基づいてテーブルをフィルタリングし、その郵便番号を含む行が見つかった場合は HTML エラーを返します。これにより、postal_code の重複が回避されます。 。

fastapi からのインポート FastAPI、HTTPException

...

@app.post("/add-city/")
async def add_city(city: 都市):
Session(engine) をセッションとして使用:

#新しいコードブロック
存在 = session.query(Cities).filter(
Cities.postal_code == city.postal_code).first()
存在する場合:
HTTPException(を発生させる)
status_code=400、detail="郵便番号はすでに存在します")
#新しいコードブロック

session.add(都市)
session.commit()
session.refresh(都市)
帰国都市

この新しいコード ブロックでは、sqlalchemy ORM を使用して最初のデータベース クエリを実行しています。従来の SQL (「SELECT FROM」) を記述する代わりに、一連の関数を使用してデータベースに直接クエリを実行しています。

存在 = session.query(Cities).filter(
Cities.postal_code == city.postal_code).first()
  • .query は SELECT … FROM … と同等です。この場合、テーブル city からすべてを選択します。
  • .filter は WHERE ステートメントと同等です。特に、送信するオブジェクトの郵便番号 (変数 city で表される) と等しいエントリを照合したいと考えています。
  • .first() は一目瞭然で、LIMIT 1 と同等です。
  • 行が見つからない場合、exist は None になり、例外は発生しないため、オブジェクトをデータベースに追加します。行が郵便番号と一致する場合、API リクエストは status_code 400 のエラーを返し、要素が追加されずに投稿リクエストは中断されます。

ここで、以前とまったく同じリクエストを送信しようとすると、API がエラー メッセージを返すことがわかります。

情報: 127.0.0.1:49076 - "POST /add-city/ HTTP/1.1" 400 不正なリクエスト

そして、行はテーブルに追加されていません。

その時点から、.csv をループし、すべての都市を 1 つずつ追加してテーブル都市を設定することができます。

POST リクエストによる Gasprices テーブルと Stations テーブルへの行の追加

これらの API 呼び出しの構築については、前の API 呼び出しと非常によく似ているため、すぐに説明します。

@app.post("/add-station/")
async def add_station(ステーション: ステーション):
Session(engine) をセッションとして使用:
存在 = session.query(ステーション).filter(
Stations.station_id == station.station_id).first()
存在する場合:
HTTPException(を発生させる)
status_code=400、detail="ステーションはすでに存在します")

session.add(ステーション)
session.commit()
session.refresh(ステーション)

折り返し駅

@app.post("/追加ガス価格/")
async def add_station(gasPrice: GasPrices):
Session(engine) をセッションとして使用:
存在 = session.query(GasPrices)。 \
フィルター(GasPrices.oil_id == GasPrice.oil_id)。 \
フィルター(GasPrices.may == GasPrice.may)。 \
初め()
存在する場合:
HTTPException(を発生させる)
status_code=400、detail="エントリはすでに存在します")

session.add(ガス価格)
session.commit()
session.refresh(ガス価格)
リターンガス価格

ここで注目すべき唯一の興味深い点は、oil_id に新しい更新があった場合にのみ行を追加するために、二重フィルター クエリを使用していることです。こうすることで、価格がある日から別の日に移動していない場合、今後の更新で重複が作成されなくなり、DB 内のスペースが節約されます。

ガス価格を取得して取り込むには、単に次の解析コードを再利用しています。パート I、対応するデータセットを取得し、それをループして各エントリに対して POST 呼び出しを行います。

以下のスクリプトは API スコープの外で実行され、データを DB にアップロードします。

インポートリクエスト
from data_parsing import get_data

BASE_API_URL = 'http://127.0.0.1:8000'

#get_data は、オープンデータから XML を取得するためにパート I で設計された関数です。
#source を作成してデータフレームに変換します
ステーション、ガス = get_data()

#押しステーションデータ
to_push = station[['latitude','longitude','cp','adress','city','station_id']].to_dict('records')

url=f'{BASE_API_URL}/add-station/'
to_push の elmt の場合:
req = request.post(url, json=elmt)

#ガソリン価格データのプッシュ
to_push = Gas.to_dict('レコード')

url=f'{BASE_API_URL}/add-gas-price/'
to_push の elmt の場合:
req = request.post(url, json=elmt)

注: ここでは、データを行ごとにプッシュすることを簡単にするために選択しました。バッチでデータをプッシュし、JSON のリストを送信するようにエンドポイントを設計することもできました。

この時点で、データベースは完全に満たされており、最終的には上記のスクリプトを使用して、より新しいデータでデータベースを更新することができ、フロントエンドで使用される GET リクエストの構築を開始して、周囲のステーションの特定の燃料の価格をクエリすることができます。特定の都市。

この特定のクエリの複雑さ (これまでに定義したすべてのテーブルを使用し、結合とジオフィルターを作成する予定) のため、また空間機能を統合するためにこの時点でいくつかの変更を行う必要があるため、この特定のクエリに完全なセクションを割り当てることにしました。データベースに追加し、アドオンをインストールし、一部のモデルを変更します。これは最初から直接行うこともできますが、実際のプロジェクトでは変更を操作するのが一般的であり、ここでどのようにしてそれをスムーズに実行できるかを示すのは興味深いと思います。

PostGIS のインストール

PostGIS は PostgreSQL の拡張機能であり、空間コンポーネントを意味する地理クエリを構築できるようになります。たとえば、この例では、対象地点から半径 30km 以内の駅からすべての行を選択できます。

ここで、実行中のコンテナに PostGIS を直接インストールすることは望ましくありません。これは、PostgreSQL のみがインストールされているイメージに基づく新しいコンテナをポップする必要があるたびに、このインストールが「失われる」ためです。

代わりに、コンテナの構築に使用しているイメージを変更し、PostgreSQL と PostGIS の両方を含むイメージに置き換えます。 DB が現在配置されている場所と同じ永続ストレージを提供するため、新しいコンテナーもそれにアクセスできるようになります。

PostGIS 拡張機能を使用してコンテナーを構築するには、まず最新の PostGIS イメージを Docker からプルし、次に現在の PostgreSQL コンテナーを強制終了して削除し、新しいイメージを使用して新しいコンテナーを構築します。

docker pull postgis
ドッカーキルステーション
docker rm ステーション
docker run -itd -e POSTGRES_USER=jkaub -e POSTGRES_PASSWORD=jkaub -p 5432:5432 -v ~/db:/var/lib/postgresql/data --name station-db postgis/postgis:latest

その後、以前と同様にコンテナにアクセスできますが、現在は PostGIS を含む PostgreSQL のバージョンを使用しています。

次に、既存のデータベースに拡張機能を追加する必要があります。まず DB に再接続します。

docker exec -it station-db bash
psql -U jkaub -d jkaub
\c 駅

次に、それに PostGIS 拡張機能を含めます。

拡張機能 postgis を作成します。

Stations モデルを変更する

PostGIS がデータベース内で稼働しているので、地理クエリを実行できるように Stations テーブルを変更する必要があります。より正確には、理解され、地球上の実際の位置に変換される「ジオメトリ」フィールドを追加する必要があります。

地図を作成したり、地球上の位置を示したりするには複数の方法があり、それぞれの方法に独自の投影法と参照座標系が使用されます。あるシステムが別のシステムと確実に通信できるようにするには、それらが同じ言語を話せるようにする必要があります。これには、単位の変換も含まれます (メートルからフィート、またはキログラムからポンドに変換するのと同じ方法)。

座標には、「Geodetic Parameter Dataset」(EPSG)と呼ばれるものを使用しています。緯度と経度 (EPSG 4326) は角度として表現され、これを距離に直接変換することはできません (距離計算を含むユークリッド幾何学は、性質上、球の表面にそのまま直接適用することはできません)。ユークリッド曲面ではありません..)。代わりに、それらを計画表現に投影する必要があります。これは、それを認識していて適切な変換を適用している限り、PostGIS で適切に処理されます。

開始点として、「幾何学的」座標として解釈できる新しいフィールドを Stations データベースに追加する必要があります。データベース内から:

ALTER TABLE ステーション ADD COLUMN geom geometry(Point, 4326);

この行は、EPSG 4326 (緯度/経度システムの EPSG) を使用して表現された、タイプ「ポイント」の PostGIS ジオムトリである新しいフィールド「geom」でステーションのテーブルを変更します。現時点ではフィールドはすべての行で空ですが、SQL で非常に簡単にフィールドに値を入力して、現在のテーブル (この時点では空ではありません) を更新できます。

UPDATE ステーション SET geom = ST_SetSRID(ST_MakePoint(経度, 緯度), 4326);

上記の SQL クエリは、経度/緯度から作成された Point を使用して Stations テーブルの各行の geom 列を設定します。ここでは、SQL でのジオメトリの定義を支援するために、ST_MakePoint と ST_SetSRID という 2 つの PostGIS 関数を使用していることに注意してください。

この新しいジオメトリが DB にどのように保存されているかを確認できます。

SELECT * FROM ステーション LIMIT 1;
駅ID |緯度 |経度 | CP |市 |住所 |ジオム
---------------+----------+----------+-----+------ ------------------------+--------------------------- ----------------------------
26110004 | 44.36 | 5,127 | 26110 |ニヨン | 31 アベニュー ドゥ ヴェンテロール | 0101000020E6100000355EBA490C821440AE47E17A142E4640

ここで、ジオメトリが Well-Known Binary (WKB) 形式の文字列にエンコードされていることがわかります。これは、ジオメトリを保存するのに効率的です。これについては詳しく説明しませんが、データセット内にこれが表示されても驚かないでください。必要に応じて、これをより読みやすい形式にデコードする必要があるかもしれません。

ここで、model.py ファイル内の Stations クラスも更新して、この新しいフィールドを含める必要があります。このために、タイプ「Geometry」を使用しています。地錬金術

import Any と入力してから
geoalchemy2.types からジオメトリをインポート

クラスステーション(SQLModel、テーブル=True):
station_id: str = フィールド(primary_key=True)
緯度: 浮動小数点
経度: 浮動小数点
CP: str
都市: str
アドレス: str
geom: オプション[任意] = フィールド(sa_column=Column(Geometry('GEOMETRY')))

最後の変更: 緯度と経度のパラメーターを使用して、POST 呼び出し (main.py 内) でジオメトリが自動的に計算されるようにします。

geoalchemy2.elements から WKTElement をインポート

@app.post("/add-station/")
async def add_station(ステーション: ステーション):
Session(engine) をセッションとして使用:
存在 = session.query(ステーション).filter(
Stations.station_id == station.station_id).first()
存在する場合:
HTTPException(を発生させる)
status_code=400、detail="ステーションはすでに存在します")

#新しいコードブロック
point = f"POINT({駅.経度} {駅.緯度})"
station.geom = WKTElement(ポイント、srid=4326)
#新しいコードブロック

session.add(ステーション)
session.commit()
session.refresh(ステーション)

#これは、適切な JSON 形式のクリーンな辞書を返すためにのみ行われます。
to_return = {}
to_return["station_id"] = station.station_id
to_return["緯度"] = 駅.緯度
to_return["経度"] = 駅.経度
to_return["cp"] = station.cp
to_return['city'] = 駅.市
to_return["住所"] = 駅.住所

return to_return

ここでは、人間が読める文字列を使用してジオメトリをエンコードする方法である WKTElement という別の形式を使用して、文字列経由でポイントを作成しています。次に、文字列は次の方法でジオメトリに変換されます。地錬金術関数 WKTElement。データベースでエンコードされる WKB 形式に暗黙的に変換します。

「geom」は JSON シリアル化できないため、API 経由でステーション オブジェクトを送り返す前に、変更するか削除する必要があることに注意してください。

最終的な GET クエリを構築する

GET クエリの目的は、郵便番号で特定された都市から半径 30 km 以内にあるすべてのステーションを取得し、一連のクエリでクエリされたすべてのステーションの特定の種類 (クエリでも使用) の最新の燃料価格を表示することです。正規化された住所や Google マップのリンクなどの整ったもの。

{
「緯度」: 49.1414、
「ロン」: 2.5087、
"都市": "オリー・ラ・ヴィル",
"駅情報": [
{
"住所": "Zi Route de Crouy 60530 Neuilly-en-Thelle",
「L あたりの価格」: 1.58、
「価格タンク」: 95、
「デルタ平均」: 25.1、
"より良い平均": 1、
"google_map_link": "https://www.google.com/maps/search/?api=1&query=Zi+Route+de+Crouy+60530+Neuilly-en-Thelle",
「距離」: 19.140224654602328、
「緯度」: 49.229、
「経度」:2.282
}, ...
]
}

これを 2 つのステップで実行します。

  • まず、結合およびフィルタリング操作を実行するための効率的な SQL クエリを構築します。
  • API 経由で結果を送信する前に、Python 関数を使用してクエリの出力を変更します。

パラメータがリクエストの本文で JSON として渡される他のクエリとは異なり、ここではクエリ パラメータが URL で直接渡される別の規則を使用します。以下の例を参照してください。

http://localhost:8000/stations/?oil_type=SP98&postal_code=60560

FastAPI では、エンドポイントの構築に使用される関数に入力を追加するだけで、非常に自然にこれが行われます。

@app.get("/stations/")
async def get_prices(oil_type: str, postal_code: str):
Session(engine) をセッションとして使用:
...

ここで最初に取得したいのは、郵便番号に関連付けられた都市の緯度と経度です。郵便番号に関連付けられている都市がない場合、API は郵便番号が見つからなかったことを示すエラー コードを返す必要があります。

city = session.query(Cities).filter(
Cities.postal_code == 郵便番号
)。初め()
都市でない場合:
HTTPException(を発生させる)
status_code=404、detail="郵便番号が見つかりません")

次に、一連のサブクエリを構築します。各サブクエリは、最後のクエリが完全に実行されるまで評価されません。これは、sqlalchemy ORM がこれらのサブクエリに基づいてクエリを動的に最適化するため、読みやすいコードを維持し、クエリを最適化するのに役立ちます。

最初に作成するサブクエリは、すでにクエリが実行されている都市の半径 30km 以内にあるステーション テーブルからすべてのステーションを選択することです。

ステーション = session.query(
ステーション.station_id、ステーション.住所、ステーション.cp、ステーション.city、
駅.緯度、駅.経度、
)。フィルター(
ST_距離(
Stations.geom.ST_GeogFromWKB()、
WKTElement(f"POINT({city.lon} {city.lat})",
srid=4326).ST_GeogFromWKB()
) < 30000).subquery()

ここで注目すべき興味深い点が数多くあります。

  • session.query( … ) では少数の列のみを選択し、フィルター処理のみを目的として行われる geom 列は保持しません。標準 SQL では、「stations から station_id、adress、cp、city、latitude、longitude を SELECT」することになります。
  • 私たちが使用しているのはST_距離、2 つの地理間の距離を計算するために使用される地錬金術の組み込み関数 (別の地錬金術タイプ)。
  • ST_Distance はジオメトリでも機能しますが、出力は角距離になります (緯度/経度は角度で表現されることに注意してください)。これは私たちが望むものではありません。
  • ジオメトリを地理に変換するには、別の組み込み関数 ST_GeoFromWKB を使用するだけで、参照系のジオメトリが地球上の点に自動的に投影されます。

次に、目的のoil_type (SP95、Gazoleなど)に基づいてGaspricesテーブルをフィルタリングします。

Price_wanted_gas = session.query(GasPrices).filter(
GasPrices.nom == 石油の種類
).サブクエリ()

また、データセットで利用可能な最新の価格に基づいて Gasprices テーブルをフィルターする必要があります。すべての更新がすべての価格に対して同時に行われるわけではないため、これは簡単な作業ではありません。サブクエリは 2 つのステップで構築します。

まず、price_wanted_gas サブテーブルから station_id と最終更新日を取得して集計を実行します。

last_price = session.query(
価格_ウォンテッド_ガス.c.ステーション_id、
func.max(price_wanted_gas.c.maj).label("max_maj")
).group_by(price_wanted_gas.c.station_id) \
.subquery()

この情報は、最新の更新価格を持つ行のみが保持される結合を介して、price_wanted_gas をフィルターするのに役立ちます。 「and_」メソッドを使用すると、結合操作で複数の条件を使用できます。

last_price_full = session.query(price_wanted_gas).join(
最終価格、
と_(
Price_wanted_gas.c.station_id == last_price.c.station_id,
Price_wanted_gas.c.maj == last_price.c.max_maj
)
).サブクエリ()

最後に、last_price_full サブテーブル (特定の燃料のすべての最新価格を含む) と station サブテーブル (半径 30 km 以内のすべてのステーションを含む) の間で最終結合を実行し、すべての結果を取得します。

station_with_price = session.query(stations, last_price_full).join(
last_price_full、
station.c.station_id == last_price_full.c.station_id
)。全て()

その時点で、GasPrices テーブルからの関連情報 (つまり、価格) と結合された関連ステーションのフィルタリングされたリストを取得しました。後はフロントの要件に合わせて出力を後処理するだけです。その時点でテーブルはすでにクリーンアップされ、フィルター処理されているため、この最後の後処理ステップは、パフォーマンスに大きな影響を与えることなく生の Python で実行できます。

この最後の後処理ステップについては、記事の核心ではないため、少しだけ詳しく説明しますが、ご自由に確認してください。GitHub詳細については、リポジトリを参照してください。

価格 = [float(e["valeur"]) for e in station_with_price]
avg_price = float(np.median(prices))

出力 = {
"緯度": 都市.緯度、
"lon": city.lon、
"都市": pretify_address(都市.名前),
"station_infos":sorted([extend_dict(x, avg_price, city.lat, city.lon) for x in station_with_price], key=lambda x: -(x['delta_average']))
}

出力を返す

これで、クエリが関連する出力を返しているかどうかをテストして確認できます。 Python のリクエストを使用してチェックすることもできますが、FastAPI には、API をテストできるすべてのエンドポイント用の組み込みドキュメントも用意されています。http://localhost:8000/docs

PostgreSQL、FastAPI、Docker を使用してバックエンドを構築する (5)

API を起動して実行できるようになったので、アプリケーションをコンテナーにパッケージ化してこの記事を終了します。

私たちのプロジェクトは次のように構成されます。

ステーションプロジェクト/
|--db/
|-- api/
|-- アプリ/
|-- 要件.txt
|-- Dockerfile
|-- update_scripts/
|-- フロント/
|-- docker-compose.yml

api/ の Dockerfile を使用して API をコンテナ化し、docker-compose を使用して API とデータベースを同時に管理します。

フォルダー db/ は、PostgreSQL コンテナーがデータベースを永続化するために使用するボリュームです。

API のパッケージ化

API をパッケージ化するには、API の実行に必要な環境と依存関係を複製する Docker イメージを構築するだけです。この Docker イメージには、コード、ランタイム、システム ツール、ライブラリ、構成など、API を実行するために必要なすべてが含まれています。

そのためには、FastAPI 環境をセットアップするための一連の命令を含む Dockerfile を作成する必要があります。原則を理解していれば、Dockerfile の作成は比較的簡単です。それは、新しいマシンを最初から構成するのと似ています。私たちの場合には:

  • 関連するバージョンの Python をインストールする必要があります
  • 作業ディレクトリをセットアップする
  • 関連するファイルを作業ディレクトリにコピーします (プロジェクトに必要なすべてのライブラリを pip インストールするために必須のrequirements.txtを含む)
  • pip install を使用してライブラリをインストールする
  • FastAPI ポートを公開する
  • API を初期化するコマンドを実行します (uvicorn main:app — reload)

これを Docker 言語に翻訳すると、次のようになります。

Pythonから:3.9

WORKDIR /コード

COPY ./requirements.txt /code/requirements.txt

コピー ./app /code/app

pip install --no-cache-dir -rrequirements.txt を実行します。

エクスポーズ80

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "80"]

また、使用するすべてのライブラリとそのバージョンを正確に記録するrequirement.txtファイルにも注意する必要があります。

fastpi==0.94.0
GeoAlchemy2==0.13.1
numpy==1.24.2
SQLAlchemy==1.4.41
sqlmodel==0.0.8
ユビコーン==0.20.0
psycopg2==2.9.5

これらの更新を行ったことにより、(Dockerfile を含むフォルダー内から) コンテナーのイメージを構築できるようになりました。

docker build -t fast-api-stations 。

docker-compose の使用

ドッカー構成は、マルチコンテナの Docker アプリケーションを定義および実行するためのツールです。私たちの場合は、SQL コンテナと FastAPI コンテナの両方を実行したいと考えています。コンピューターには既に docker-compose がインストールされていると仮定します。そうでない場合は、フォローしてくださいそれらの指示

docker-compose を使用するには、単にdocker-compose.ymlこのファイルは、プロジェクトのルート ディレクトリにあり、アプリケーションを構成するサービスとそれぞれの構成を定義します。

docker-compose.ymlこのファイルは YAML 構文を使用して一連のサービスを定義し、それぞれがグローバル アプリケーションの一部として実行されるコンテナーを表します。各サービスは、イメージ、ビルド コンテキスト、環境変数、永続ボリューム、ポートなどを指定できます。

docker-compose.yml は次のようになります。

バージョン:「3」

サービス:
ファスタピ:
画像: 高速 API ステーション
ポート:
- 「8000:80」

ステーションデータベース:
画像: postgis/postgis
環境:
POSTGRES_USER: jkaub
POSTGRES_PASSWORD: jkaub
POSTGRES_DB: 駅
ボリューム:
- ./db:/var/lib/postgresql/data

ご覧のとおり、2 つのサービスを定義しています。

  • 1 つは API 用で、現在は FastAPI と名付けられています。これは、前のサブセクションで作成した Docker イメージ fast-api-station 上に構築されています。このサービスでは、コンテナからポート 80 をローカル ポート 8000 に公開します。
  • 1 つは DB 用で、PostGIS イメージ上で実行されます。以前と同じ環境変数を指定し、データベースに永続化する同じボリュームを指定します。

最後に小さな変更をもう 1 つ

以前はローカル IP を使用して SQL エンジンに接続していました。現在 API と PostgreSQL を 2 つの異なる環境で実行しているため、データベースへの接続方法を切り替える必要があります。

docker-compose は、異なるコンテナ間を独自のネットワーク上で管理し、あるサービスから別のサービスに簡単に接続できるようにします。 API サービスから SQL サービスに接続するには、エンジンの作成時に接続先のサービスの名前を指定します。

DATABASE_URL = 'postgresql://jkaub:jkaub@stationdb/stations'

バックエンドの実行

すべての設定が完了したので、次のようにしてバックエンド アプリケーションを実行できます。

docker-構成アップ

API はポート 8000 経由で利用可能になります

http://localhost:8000/docs

この記事では、GasFinder アプリケーションのバックエンドに取り組んできました。

サードパーティ接続への依存に関連する可能性のあるすべての問題を回避するために、アプリケーションのすべての関連データを独自のストレージ ソリューションに保存することにしました。

Docker と PostgreSQL+PostGIS を活用して効率的な地理クエリを実行できるデータベースを構築し、Python フレームワーク FastAPI + SQLModel を使用してデータベースと対話し、開発されたフロントエンドにデータを提供するために使用できる効率的な API を構築しました。以前の記事。

現時点では、100% ローカルで実行できる「運用標準」ツール (React、PostgreSQL、FastAPI など) に基づいたプロトタイプがあります。このシリーズの最後の部分では、アプリケーションを稼働させ、SQL テーブルを自動的に更新して常に最新の情報を提供する方法を見ていきます。

References

Top Articles
Latest Posts
Article information

Author: Gregorio Kreiger

Last Updated: 06/20/2023

Views: 5299

Rating: 4.7 / 5 (77 voted)

Reviews: 84% of readers found this page helpful

Author information

Name: Gregorio Kreiger

Birthday: 1994-12-18

Address: 89212 Tracey Ramp, Sunside, MT 08453-0951

Phone: +9014805370218

Job: Customer Designer

Hobby: Mountain biking, Orienteering, Hiking, Sewing, Backpacking, Mushroom hunting, Backpacking

Introduction: My name is Gregorio Kreiger, I am a tender, brainy, enthusiastic, combative, agreeable, gentle, gentle person who loves writing and wants to share my knowledge and understanding with you.