以前、PostgreSQL で非同期な接続プールに deadpool を利用したことがあります。
- GitHub - bikeshedder/deadpool: Dead simple pool implementation for rust with async-await
- deadpool/postgres at master · bikeshedder/deadpool · GitHub
PostgreSQL への接続では deadpool を直接扱うのではなく、deadpool-postgres (w/ tokio-postgres) が用意されているので、そちらを利用します。
deadpool 自体は汎用的な接続プールなので、tiberius と組み合わせて、構造体やトレイトを実装すれば、SQLServer への接続プールとして利用できるのですが、今のところ crates.io で探しても該当するものはありません。
私の場合、deadpool の Managed pool が利用できれば十分なので、とりあえず tokio ランタイムで動作するようにしてみました。
1. 環境とデータベース
Rustでasync/awaitに対応したTiberiusからSQLServerに接続する
※ここで構築した環境を引き続き利用します。
- OS: KDE neon 5.21.5 (Ubuntu 20.04 ベース) / macOS Mojave v10.14.6
- rustc 1.52.1
- SQL Server 2019 (RTM-CU10) (KB5001090) - 15.0.4123.1 (X64) on Linux
- データベース: my_test_db
2. プロジェクト
(1) 準備
適当なディレクトリにプロジェクトを作成します。(今回は ~/work/pool_tokio)
$ cd ~/work $ cargo new pool_tokio $ cd pool_tokio
(2) Cargo.toml の編集
[dependencies] セクションに下記の内容を追加します。
[dependencies] tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.6", features = ["compat"] } async-trait = "0.1" deadpool = "0.8" tiberius = { version = "0.5", features = ["chrono"] } chrono = "0.4" futures = "0.3"
(3) ソースコード: src/mssql.rs
プロジェクトの src ディレクトリに下記のソースコードを mssql.rs として保存します。
use async_trait::async_trait; use tokio_util::compat::TokioAsyncWriteCompatExt; pub struct Manager { config: tiberius::Config, } impl Manager { pub fn new(config: tiberius::Config) -> Self { Self { config } } } type Client = tiberius::Client<tokio_util::compat::Compat<tokio::net::TcpStream>>; pub struct ClientWrapper { client: Client, } impl ClientWrapper { fn new(client: Client) -> Self { Self { client } } } impl std::ops::Deref for ClientWrapper { type Target = Client; fn deref(&self) -> &Client { &self.client } } impl std::ops::DerefMut for ClientWrapper { fn deref_mut(&mut self) -> &mut Client { &mut self.client } } #[async_trait] impl deadpool::managed::Manager for Manager { type Type = ClientWrapper; type Error = tiberius::error::Error; async fn create(&self) -> Result<Self::Type, Self::Error> { let tcp = tokio::net::TcpStream::connect(self.config.get_addr()).await?; tcp.set_nodelay(true)?; let client = Client::connect(self.config.clone(), tcp.compat_write()).await?; let client_wrapper = ClientWrapper::new(client); Ok(client_wrapper) } async fn recycle(&self, _: &mut Self::Type) -> deadpool::managed::RecycleResult<Self::Error> { Ok(()) } }
数日前に deadpool version = "0.8.0" で deadpool::managed::Manager トレイトの仕様が変更されたので、それに合わせてあります。
(4) ソースコード: src/main.rs
下記のソースコードをコピーして src/main.rs を書き換えます。
※ソース中の conn_str の内容は、試す環境に合わせて変更してください。
mod mssql; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { println!("#### Start ####"); let started = std::time::Instant::now(); let conn_str = "Server=tcp:localhost,1433;TrustServerCertificate=true;Database=my_test_db;UID=sa;PWD=abcd1234$"; let config = tiberius::Config::from_ado_string(conn_str)?; let manager = mssql::Manager::new(config); let pool = deadpool::managed::Pool::<mssql::Manager>::new(manager, 10); let mut threads = Vec::new(); for idx in 0..20 { let pool = pool.clone(); let handle = tokio::spawn(async move { println!("Thread #{}", idx); let mut conn = pool.get().await.unwrap(); tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; let _ = conn .simple_query("SELECT 番号,氏名,誕生日 FROM 会員名簿 ORDER BY 誕生日 DESC") .await .unwrap() .into_first_result() .await .unwrap() .iter() .map(|row| { println!( "#{} | {} | {} | {} |", idx, row.get::<i32, _>("番号").unwrap(), row.get::<&str, _>("氏名").unwrap(), row.get::<chrono::NaiveDate, _>("誕生日") .unwrap() .format("%Y/%m/%d"), ) }) .collect::<Vec<_>>(); }); threads.push(handle); } futures::future::join_all(threads).await; println!("#### Finish ####"); println!("経過時間: {:?}", started.elapsed()); Ok(()) }
準備は以上です。
3. 実行結果
接続プールの最大値を 10 に設定した場合の実行結果です。
$ cargo run Finished dev [unoptimized + debuginfo] target(s) in 0.05s Running `target/debug/pool_tokio` #### Start #### Thread #0 Thread #1 Thread #4 ・・・(ざっくり省略)・・・ #18 | 105 | 江口 美奈 | 1979/06/23 | #13 | 307 | 中井 雄樹 | 1984/02/29 | #11 | 307 | 中井 雄樹 | 1984/02/29 | #13 | 105 | 江口 美奈 | 1979/06/23 | #11 | 105 | 江口 美奈 | 1979/06/23 | #### Finish #### 経過時間: 554.619646ms $
4. おまけ
deadpool = "0.7" の場合、src/mssql.rs の内容はこちらになります。
use async_trait::async_trait; use tokio_util::compat::TokioAsyncWriteCompatExt; pub struct Manager { config: tiberius::Config, } impl Manager { pub fn new(config: tiberius::Config) -> Self { Self { config } } } type Connection = tiberius::Client<tokio_util::compat::Compat<tokio::net::TcpStream>>; type Error = tiberius::error::Error; #[async_trait] impl deadpool::managed::Manager<Connection, Error> for Manager { async fn create(&self) -> Result<Connection, Error> { let tcp = tokio::net::TcpStream::connect(self.config.get_addr()).await?; tcp.set_nodelay(true)?; let client = tiberius::Client::connect(self.config.clone(), tcp.compat_write()).await?; Ok(client) } async fn recycle(&self, _conn: &mut Connection) -> deadpool::managed::RecycleResult<Error> { Ok(()) } }
その際、src/main.rs のlet pool = 〜
の行も修正します。
let pool = deadpool::managed::Pool::new(manager, 10);
deadpool = "0.7" では接続プールから接続そのものを取得する仕様だったのですが、deadpool = "0.8" からオブジェクトを取得するようになりました。
今日はこの事実に気づかず、先週まで動作してたプロジェクトがコンパイルエラーになって焦りました。
私の用途だと、前の仕様のままでも十分なんですけどね。
....
ここでは tokio での実装サンプルを掲載しましたが、async-std でも同様でした。
ライブラリ化するなら、bb8-tiberius と deadpool-postgres を参考に Managed pool と Unmanaged pool を実装して、Config に対応する感じでしょうか。
mobc も同様のノリで実装してみましたが、Web 系フレームワークとの組み合わせでは deadpool の方が性能良かったので、deadpool を採用しました。
※あくまで私が試した環境での話です。
tokio では rweb、async-std では tide との組み合わせで、ほぼ期待通りの結果が得られています。