刹那(せつな)の瞬き

Willkömmen! Ich heiße Setsuna. Haben Sie etwas Zeit für mich?

RustでSQLServerへの接続プールにdeadpoolを試してみた

以前、PostgreSQL で非同期な接続プールに deadpool を利用したことがあります。

PostgreSQL への接続では deadpool を直接扱うのではなく、deadpool-postgres (w/ tokio-postgres) が用意されているので、そちらを利用します。

deadpool 自体は汎用的な接続プールなので、tiberius と組み合わせて、構造体やトレイトを実装すれば、SQLServer への接続プールとして利用できるのですが、今のところ crates.io で探しても該当するものはありません。

私の場合、deadpool の Managed pool が利用できれば十分なので、とりあえず tokio ランタイムで動作するようにしてみました。

1. 環境とデータベース

Rustでasync/awaitに対応したTiberiusからSQLServerに接続する

※ここで構築した環境を引き続き利用します。

  • OS: KDE neon 5.21.5 (Ubuntu 20.04 ベース) / macOS Mojave v10.14.6
  • rustc 1.52.1
  • SQL Server 2019 (RTM-CU10) (KB5001090) - 15.0.4123.1 (X64) on Linux
  • データベース: my_test_db

2. プロジェクト

(1) 準備

適当なディレクトリにプロジェクトを作成します。(今回は ~/work/pool_tokio)

$ cd ~/work
$ cargo new pool_tokio
$ cd pool_tokio
(2) Cargo.toml の編集

[dependencies] セクションに下記の内容を追加します。

[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.6", features = ["compat"] }
async-trait = "0.1"
deadpool = "0.8"
tiberius = { version = "0.5", features = ["chrono"] }
chrono = "0.4"
futures = "0.3"
(3) ソースコード: src/mssql.rs

プロジェクトの src ディレクトリに下記のソースコードmssql.rs として保存します。 

use async_trait::async_trait;
use tokio_util::compat::TokioAsyncWriteCompatExt;

pub struct Manager {
    config: tiberius::Config,
}

impl Manager {
    pub fn new(config: tiberius::Config) -> Self {
        Self { config }
    }
}

type Client = tiberius::Client<tokio_util::compat::Compat<tokio::net::TcpStream>>;

pub struct ClientWrapper {
    client: Client,
}

impl ClientWrapper {
    fn new(client: Client) -> Self {
        Self { client }
    }
}

impl std::ops::Deref for ClientWrapper {
    type Target = Client;
    fn deref(&self) -> &Client {
        &self.client
    }
}

impl std::ops::DerefMut for ClientWrapper {
    fn deref_mut(&mut self) -> &mut Client {
        &mut self.client
    }
}

#[async_trait]
impl deadpool::managed::Manager for Manager {
    type Type = ClientWrapper;
    type Error = tiberius::error::Error;

    async fn create(&self) -> Result<Self::Type, Self::Error> {
        let tcp = tokio::net::TcpStream::connect(self.config.get_addr()).await?;
        tcp.set_nodelay(true)?;
        let client = Client::connect(self.config.clone(), tcp.compat_write()).await?;
        let client_wrapper = ClientWrapper::new(client);
        Ok(client_wrapper)
    }

    async fn recycle(&self, _: &mut Self::Type) -> deadpool::managed::RecycleResult<Self::Error> {
        Ok(())
    }
}

数日前に deadpool version = "0.8.0" で deadpool::managed::Manager トレイトの仕様が変更されたので、それに合わせてあります。

(4) ソースコード: src/main.rs

下記のソースコードをコピーして src/main.rs を書き換えます。
※ソース中の conn_str の内容は、試す環境に合わせて変更してください。

mod mssql;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("#### Start ####");
    let started = std::time::Instant::now();

    let conn_str = "Server=tcp:localhost,1433;TrustServerCertificate=true;Database=my_test_db;UID=sa;PWD=abcd1234$";
    let config = tiberius::Config::from_ado_string(conn_str)?;
    let manager = mssql::Manager::new(config);
    let pool = deadpool::managed::Pool::<mssql::Manager>::new(manager, 10);

    let mut threads = Vec::new();
    for idx in 0..20 {
        let pool = pool.clone();
        let handle = tokio::spawn(async move {
            println!("Thread #{}", idx);
            let mut conn = pool.get().await.unwrap();
            tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
            let _ = conn
                .simple_query("SELECT 番号,氏名,誕生日 FROM 会員名簿 ORDER BY 誕生日 DESC")
                .await
                .unwrap()
                .into_first_result()
                .await
                .unwrap()
                .iter()
                .map(|row| {
                    println!(
                        "#{} | {} | {} | {} |",
                        idx,
                        row.get::<i32, _>("番号").unwrap(),
                        row.get::<&str, _>("氏名").unwrap(),
                        row.get::<chrono::NaiveDate, _>("誕生日")
                            .unwrap()
                            .format("%Y/%m/%d"),
                    )
                })
                .collect::<Vec<_>>();
        });
        threads.push(handle);
    }
    futures::future::join_all(threads).await;

    println!("#### Finish ####");
    println!("経過時間: {:?}", started.elapsed());
    Ok(())
}

準備は以上です。

3. 実行結果

接続プールの最大値を 10 に設定した場合の実行結果です。

$ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.05s
     Running `target/debug/pool_tokio`
#### Start ####
Thread #0
Thread #1
Thread #4

・・・(ざっくり省略)・・・

#18 | 105 | 江口 美奈 | 1979/06/23 |
#13 | 307 | 中井 雄樹 | 1984/02/29 |
#11 | 307 | 中井 雄樹 | 1984/02/29 |
#13 | 105 | 江口 美奈 | 1979/06/23 |
#11 | 105 | 江口 美奈 | 1979/06/23 |
#### Finish ####
経過時間: 554.619646ms
$ 

4. おまけ

deadpool = "0.7" の場合、src/mssql.rs の内容はこちらになります。

use async_trait::async_trait;
use tokio_util::compat::TokioAsyncWriteCompatExt;

pub struct Manager {
    config: tiberius::Config,
}

impl Manager {
    pub fn new(config: tiberius::Config) -> Self {
        Self { config }
    }
}

type Connection = tiberius::Client<tokio_util::compat::Compat<tokio::net::TcpStream>>;
type Error = tiberius::error::Error;

#[async_trait]
impl deadpool::managed::Manager<Connection, Error> for Manager {
    async fn create(&self) -> Result<Connection, Error> {
        let tcp = tokio::net::TcpStream::connect(self.config.get_addr()).await?;
        tcp.set_nodelay(true)?;
        let client = tiberius::Client::connect(self.config.clone(), tcp.compat_write()).await?;
        Ok(client)
    }
    async fn recycle(&self, _conn: &mut Connection) -> deadpool::managed::RecycleResult<Error> {
        Ok(())
    }
}

その際、src/main.rs のlet pool = 〜の行も修正します。

    let pool = deadpool::managed::Pool::new(manager, 10);

deadpool = "0.7" では接続プールから接続そのものを取得する仕様だったのですが、deadpool = "0.8" からオブジェクトを取得するようになりました。

今日はこの事実に気づかず、先週まで動作してたプロジェクトがコンパイルエラーになって焦りました。

私の用途だと、前の仕様のままでも十分なんですけどね。

....

ここでは tokio での実装サンプルを掲載しましたが、async-std でも同様でした。

ライブラリ化するなら、bb8-tiberius と deadpool-postgres を参考に Managed pool と Unmanaged pool を実装して、Config に対応する感じでしょうか。

mobc も同様のノリで実装してみましたが、Web 系フレームワークとの組み合わせでは deadpool の方が性能良かったので、deadpool を採用しました。
※あくまで私が試した環境での話です。

tokio では rweb、async-std では tide との組み合わせで、ほぼ期待通りの結果が得られています。