deadpool-r2d2とr2d2_odbc_apiを組み合わせたら、非同期処理でODBCからRDBを扱えた
先般、macOS, Linux 環境で Rust から odbc-api クレートを利用しました。
ここでは MS 版 ODBC ドライバに寄せて評価しましたが、odbc-api クレートはどの ODBC ドライバでも利用できる汎用的なものです。
Rust から DBMS に接続する際、専用クレートが存在しなくても ODBC ドライバがあれば解決できるかもしれません。
....
単一の ODBC 接続は評価できたので、次は接続プールです。
以降の内容は、前述した記事の環境を前提とします。
※macOS, Linux 環境で試しています。
1. 接続プール r2d2 を利用して並列処理
odbc-api クレートには、接続プール r2d2 に対応した専用の r2d2_odbc_api クレートがあります。
- https://crates.io/crates/r2d2
- https://crates.io/crates/r2d2_odbc_api
- https://crates.io/crates/odbc-api
まずは r2d2_odbc_api クレートの動作を確認してみます。
・Cargo.toml
[dependencies] r2d2 = "0.8.9" r2d2_odbc_api = "0.1.4" anyhow = "1"
・ソースコード: src/main.rs
use r2d2_odbc_api::{Cursor, buffers::TextRowSet}; const BATCH_SIZE: usize = 5000; const BUFFER_SIZE: Option<usize> = Some(4096); fn main() -> anyhow::Result<()> { println!("#### Start ####"); let started = std::time::Instant::now(); //let conn_str = "Driver={ODBC Driver 17 for SQL Server};Server=tcp:localhost,1433;TrustServerCertificate=yes;UID=sa;PWD=abcd1234$;Database=my_test_db"; let conn_str = "Driver={ODBC Driver 18 for SQL Server};Server=tcp:localhost,1833;TrustServerCertificate=yes;UID=sa;PWD=abcd1234$;Database=my_test_db"; let manager = r2d2_odbc_api::ODBCConnectionManager::new(conn_str); let pool = r2d2::Pool::builder().max_size(10).build(manager).unwrap(); let mut threads = Vec::new(); for idx in 0..20 { let pool = pool.clone(); let handle = std::thread::spawn(move || { println!("Thread #{}", idx); let conn = pool.get().unwrap(); std::thread::sleep(std::time::Duration::from_millis(200)); let conn = conn.raw(); let qry = "SELECT 番号,氏名,誕生日 FROM 会員名簿 ORDER BY 誕生日 DESC"; if let Some(cursor) = conn.execute(qry, ()).unwrap() { let mut buffers = TextRowSet::for_cursor(BATCH_SIZE, &cursor, BUFFER_SIZE).unwrap(); let mut row_set_cursor = cursor.bind_buffer(&mut buffers).unwrap(); if let Some(batch) = row_set_cursor.fetch().unwrap() { for row_index in 0..batch.num_rows() { let id: i32 = std::str::from_utf8(batch.at(0, row_index).unwrap_or(&[])).unwrap().parse().unwrap(); let name = std::str::from_utf8(batch.at(1, row_index).unwrap_or(&[])).unwrap(); let birthday = std::str::from_utf8(batch.at(2, row_index).unwrap_or(&[])).unwrap(); println!("#{} | {} | {} | {} |", idx, id, name, birthday); } } }; }); threads.push(handle); } for th in threads { let _ = th.join(); } println!("#### Finish ####"); println!("経過時間: {:?}", started.elapsed()); Ok(()) }
接続プール数の指定や接続の取り出しは、他の r2d2 アダプタと同様に記述できます。
・実行結果
スレッド別にクエリを実行した結果が表示されるはずです。
※出力結果は、過去記事と同様なので、ここでは省略します。
....
ところで、接続プール r2d2 は async/await 構文に対応していません。
odbc-api が依存する odbc-sys も一部に注意制限事項があります。
せっかく並列処理を記述できるのに、今時の非同期ランタイムに対応していないので、使い所が難しい様に思います。
私は Rocket v0.4 で利用しましたが、状況によってはお奨めできません。
そこで試したのが、deadpool-r2d2 クレートとの併用です。
2. 非同期タスク対応の deadpool-r2d2 で r2d2_odbc_api を補助
非同期な汎用プール deadpool のアダプタに r2d2 をバックエンドとする deadpool-r2d2 があります。
理論上、deadpool / deadpool-r2d2 で環境を整えれば、r2d2 のアダプタを非同期タスク対応にできるはずなので、この方針で r2d2_odbc_api を試してみます。
・Cargo.toml
[dependencies] deadpool = "0.9" deadpool-r2d2 = "0.2.0" r2d2_odbc_api = "0.1.4" tokio = "1.18"
futures = "0.3" anyhow = "1"
・ソースコード: src/main.rs
use r2d2_odbc_api::{Cursor, buffers::TextRowSet}; type ODBCManager = deadpool_r2d2::Manager<r2d2_odbc_api::ODBCConnectionManager>; type ODBCPool = deadpool_r2d2::Pool<ODBCManager>; const BATCH_SIZE: usize = 5000; const BUFFER_SIZE: Option<usize> = Some(4096); struct Member { id: i32, name: String, birthday: String, } #[tokio::main] async fn main() -> anyhow::Result<()> { println!("#### Start ####"); let started = std::time::Instant::now(); //let conn_str = "Driver={ODBC Driver 17 for SQL Server};Server=tcp:localhost,1433;TrustServerCertificate=yes;UID=sa;PWD=abcd1234$;Database=my_test_db"; let conn_str = "Driver={ODBC Driver 18 for SQL Server};Server=tcp:localhost,1833;TrustServerCertificate=yes;UID=sa;PWD=abcd1234$;Database=my_test_db"; let manager = ODBCManager::new( r2d2_odbc_api::ODBCConnectionManager::new(conn_str), deadpool_r2d2::Runtime::Tokio1, ); let pool = ODBCPool::builder(manager).max_size(10).build()?; let mut tasks = Vec::new(); for idx in 0..20 { let pool = pool.clone(); let handle = tokio::spawn(async move { println!("Async Task #{}", idx); let client = pool.get().await.unwrap(); tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; let result = client .interact(|client| { let mut records = Vec::<Member>::new(); let conn = client.raw(); let qry = "SELECT 番号,氏名,誕生日 FROM 会員名簿 ORDER BY 誕生日 DESC"; if let Some(cursor) = conn.execute(qry, ()).unwrap() { let mut buffers = TextRowSet::for_cursor(BATCH_SIZE, &cursor, BUFFER_SIZE).unwrap(); let mut row_set_cursor = cursor.bind_buffer(&mut buffers).unwrap(); if let Some(batch) = row_set_cursor.fetch().unwrap() { for row_index in 0..batch.num_rows() { records.push(Member { id: std::str::from_utf8(batch.at(0, row_index).unwrap_or(&[])).unwrap().parse().unwrap(), name: std::str::from_utf8(batch.at(1, row_index).unwrap_or(&[])).unwrap().into(), birthday: std::str::from_utf8(batch.at(2, row_index).unwrap_or(&[])).unwrap().into(), }); } } }; records }) .await .unwrap(); for row in &result { println!("#{} | {} | {} | {} |", idx, row.id, row.name, row.birthday); } }); tasks.push(handle); } futures::future::join_all(tasks).await; println!("#### Finish ####"); println!("経過時間: {:?}", started.elapsed()); Ok(()) }
このソースコードは tokio ランタイム用ですが、async-std でも同様に記述できます。
....
deadpool-r2d2 では、接続プールから接続を取り出す際、.interact() に渡すクロージャ内で同期的処理が完結するように記述します。
理想を言えば ODBC の SQL メソッド毎に await したいのですが、これはできません。
非同期タスクに対応していないものを、むやみに非同期化しても、どこかで綻びが生じるので「 deadpool-r2d2 内部の SyncGuard が保護する範囲内で ODBC に関する処理を済ませ、その結果を返却するもの」として、割り切ってます。
ソースコード中では、結果セット用の構造体を用意して、クエリの取得結果をロー毎に Vec<T> へ格納し、戻り値としています。
・実行結果
非同期タスク別に実行された結果が表示されます。
$ cargo run Finished dev [unoptimized + debuginfo] target(s) in 0.21s Running `target/debug/tokio_odbcapi` #### Start #### Async Task #0 Async Task #1 Async Task #2
・・・(ざっくり省略)・・・
#19 | 105 | 江口 美奈 | 1979-06-23 | #19 | 210 | 荒井 伸次郎 | 1974-01-30 | #### Finish #### 経過時間: 746.914333ms
3. 所感
環境を整えるだけで、無事に非同期タスクで処理できました。
試した ODBC ドライバは SQLServer (msodbc, tdsodbc) と PostgreSQL だけですが、他の ODBC ドライバでも利用できると思います。
ここまで手軽に ODBC 接続を非同期ランタイムで利用できるとは思いませんでした。
....
元々、deadpool は tiberius の接続プールに利用したくて調査したものです。
その際、deadpool-r2d2 の存在を知り、ずっと気になってました。
今回の結果を踏まえて、当時試作した rweb (tokio) と tide (async-std) のプロジェクトを修正してみましたが、deadpool-r2d2 / r2d2_odbc_api で問題なく動作しています。
....
なお、deadpool はアダプタが豊富なので、PostgreSQL, Sqlite 等については専用クレート、MySQL は deadpool-r2d2 / r2d2_mysql の組み合わせ等があります。
目的の DBMS が単一の場合、専用クレートや専用ライブラリ・ツールに優位性があるので、積極的に ODBC ドライバを利用する機会は少ないかもしれません。
それでも、非同期タスクで異なる DBMS を扱える deadpool-r2d2 / r2d2_odbc_api / ODBC ドライバの組み合わせは便利だと思います。