刹那(せつな)の瞬き

Willkömmen! Ich heiße Setsuna. Haben Sie etwas Zeit für mich?

deadpool-r2d2とr2d2_odbc_apiを組み合わせたら、非同期処理でODBCからRDBを扱えた

先般、macOS, Linux 環境で Rust から odbc-api クレートを利用しました。

ここでは MS 版 ODBC ドライバに寄せて評価しましたが、odbc-api クレートはどの ODBC ドライバでも利用できる汎用的なものです。

Rust から DBMS に接続する際、専用クレートが存在しなくても ODBC ドライバがあれば解決できるかもしれません。

....

単一の ODBC 接続は評価できたので、次は接続プールです。

以降の内容は、前述した記事の環境を前提とします。
macOS, Linux 環境で試しています。

1. 接続プール r2d2 を利用して並列処理

odbc-api クレートには、接続プール r2d2 に対応した専用の r2d2_odbc_api クレートがあります。

まずは r2d2_odbc_api クレートの動作を確認してみます。

・Cargo.toml
[dependencies]
r2d2 = "0.8.9"
r2d2_odbc_api = "0.1.4"
anyhow = "1"
ソースコード: src/main.rs
use r2d2_odbc_api::{Cursor, buffers::TextRowSet};

const BATCH_SIZE: usize = 5000;
const BUFFER_SIZE: Option<usize> = Some(4096);

fn main() -> anyhow::Result<()> {
    println!("#### Start ####");
    let started = std::time::Instant::now();

    //let conn_str = "Driver={ODBC Driver 17 for SQL Server};Server=tcp:localhost,1433;TrustServerCertificate=yes;UID=sa;PWD=abcd1234$;Database=my_test_db";
    let conn_str = "Driver={ODBC Driver 18 for SQL Server};Server=tcp:localhost,1833;TrustServerCertificate=yes;UID=sa;PWD=abcd1234$;Database=my_test_db";
    let manager = r2d2_odbc_api::ODBCConnectionManager::new(conn_str);
    let pool = r2d2::Pool::builder().max_size(10).build(manager).unwrap();

    let mut threads = Vec::new();
    for idx in 0..20 {
        let pool = pool.clone();
        let handle = std::thread::spawn(move || {
            println!("Thread #{}", idx);
            let conn = pool.get().unwrap();
            std::thread::sleep(std::time::Duration::from_millis(200));
            let conn = conn.raw();
            let qry = "SELECT 番号,氏名,誕生日 FROM 会員名簿 ORDER BY 誕生日 DESC";
            if let Some(cursor) = conn.execute(qry, ()).unwrap() {
                let mut buffers = TextRowSet::for_cursor(BATCH_SIZE, &cursor, BUFFER_SIZE).unwrap();
                let mut row_set_cursor = cursor.bind_buffer(&mut buffers).unwrap();
                if let Some(batch) = row_set_cursor.fetch().unwrap() {
                    for row_index in 0..batch.num_rows() {
                        let id: i32 = std::str::from_utf8(batch.at(0, row_index).unwrap_or(&[])).unwrap().parse().unwrap();
                        let name =  std::str::from_utf8(batch.at(1, row_index).unwrap_or(&[])).unwrap();
                        let birthday = std::str::from_utf8(batch.at(2, row_index).unwrap_or(&[])).unwrap();
                        println!("#{} | {} | {} | {} |", idx, id, name, birthday);
                    }
                }
            };
        });
        threads.push(handle);
    }
    for th in threads {
        let _ = th.join();
    }

    println!("#### Finish ####");
    println!("経過時間: {:?}", started.elapsed());
    Ok(())
}

接続プール数の指定や接続の取り出しは、他の r2d2 アダプタと同様に記述できます。

・実行結果

スレッド別にクエリを実行した結果が表示されるはずです。
※出力結果は、過去記事と同様なので、ここでは省略します。

....

ところで、接続プール r2d2 は async/await 構文に対応していません。
odbc-api が依存する odbc-sys も一部に注意制限事項があります。

せっかく並列処理を記述できるのに、今時の非同期ランタイムに対応していないので、使い所が難しい様に思います。

私は Rocket v0.4 で利用しましたが、状況によってはお奨めできません。

そこで試したのが、deadpool-r2d2 クレートとの併用です。

 

2. 非同期タスク対応の deadpool-r2d2r2d2_odbc_api を補助

非同期な汎用プール deadpool のアダプタに r2d2 をバックエンドとする deadpool-r2d2 があります。

理論上、deadpool / deadpool-r2d2 で環境を整えれば、r2d2 のアダプタを非同期タスク対応にできるはずなので、この方針で r2d2_odbc_api を試してみます。

・Cargo.toml
[dependencies]
deadpool = "0.9"
deadpool-r2d2 = "0.2.0"
r2d2_odbc_api = "0.1.4"
tokio = "1.18"
futures = "0.3" anyhow = "1"
ソースコード: src/main.rs
use r2d2_odbc_api::{Cursor, buffers::TextRowSet};

type ODBCManager = deadpool_r2d2::Manager<r2d2_odbc_api::ODBCConnectionManager>;
type ODBCPool = deadpool_r2d2::Pool<ODBCManager>;

const BATCH_SIZE: usize = 5000;
const BUFFER_SIZE: Option<usize> = Some(4096);

struct Member {
    id: i32,
    name: String,
    birthday: String,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    println!("#### Start ####");
    let started = std::time::Instant::now();

    //let conn_str = "Driver={ODBC Driver 17 for SQL Server};Server=tcp:localhost,1433;TrustServerCertificate=yes;UID=sa;PWD=abcd1234$;Database=my_test_db";
    let conn_str = "Driver={ODBC Driver 18 for SQL Server};Server=tcp:localhost,1833;TrustServerCertificate=yes;UID=sa;PWD=abcd1234$;Database=my_test_db";
    let manager = ODBCManager::new(
        r2d2_odbc_api::ODBCConnectionManager::new(conn_str),
        deadpool_r2d2::Runtime::Tokio1,
    );
    let pool = ODBCPool::builder(manager).max_size(10).build()?;

    let mut tasks = Vec::new();
    for idx in 0..20 {
        let pool = pool.clone();
        let handle = tokio::spawn(async move {
            println!("Async Task #{}", idx);
            let client = pool.get().await.unwrap();
            tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
            let result = client
                .interact(|client| {
                    let mut records = Vec::<Member>::new();
                    let conn = client.raw();
                    let qry = "SELECT 番号,氏名,誕生日 FROM 会員名簿 ORDER BY 誕生日 DESC";
                    if let Some(cursor) = conn.execute(qry, ()).unwrap() {
                        let mut buffers = TextRowSet::for_cursor(BATCH_SIZE, &cursor, BUFFER_SIZE).unwrap();
                        let mut row_set_cursor = cursor.bind_buffer(&mut buffers).unwrap();
                        if let Some(batch) = row_set_cursor.fetch().unwrap() {
                            for row_index in 0..batch.num_rows() {
                                records.push(Member {
                                    id: std::str::from_utf8(batch.at(0, row_index).unwrap_or(&[])).unwrap().parse().unwrap(),
                                    name: std::str::from_utf8(batch.at(1, row_index).unwrap_or(&[])).unwrap().into(),
                                    birthday: std::str::from_utf8(batch.at(2, row_index).unwrap_or(&[])).unwrap().into(),
                                });
                            }
                        }
                    };
                    records
                })
                .await
                .unwrap();
            for row in &result {
                println!("#{} | {} | {} | {} |", idx, row.id, row.name, row.birthday);
            }
        });
        tasks.push(handle);
    }
    futures::future::join_all(tasks).await;

    println!("#### Finish ####");
    println!("経過時間: {:?}", started.elapsed());
    Ok(())
}

このソースコードtokio ランタイム用ですが、async-std でも同様に記述できます。

....

deadpool-r2d2 では、接続プールから接続を取り出す際、.interact() に渡すクロージャ内で同期的処理が完結するように記述します。

理想を言えば ODBCSQL メソッド毎に await したいのですが、これはできません。

非同期タスクに対応していないものを、むやみに非同期化しても、どこかで綻びが生じるので「 deadpool-r2d2 内部の SyncGuard が保護する範囲内で ODBC に関する処理を済ませ、その結果を返却するもの」として、割り切ってます。

ソースコード中では、結果セット用の構造体を用意して、クエリの取得結果をロー毎に Vec<T> へ格納し、戻り値としています。

 

・実行結果

非同期タスク別に実行された結果が表示されます。

$ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.21s
     Running `target/debug/tokio_odbcapi`
#### Start ####
Async Task #0
Async Task #1
Async Task #2

・・・(ざっくり省略)・・・
#19 | 105 | 江口 美奈 | 1979-06-23 | #19 | 210 | 荒井 伸次郎 | 1974-01-30 | #### Finish #### 経過時間: 746.914333ms

 

3. 所感

環境を整えるだけで、無事に非同期タスクで処理できました。

試した ODBC ドライバは SQLServer (msodbc, tdsodbc) と PostgreSQL だけですが、他の ODBC ドライバでも利用できると思います。

ここまで手軽に ODBC 接続を非同期ランタイムで利用できるとは思いませんでした。

....

元々、deadpool は tiberius の接続プールに利用したくて調査したものです。

その際、deadpool-r2d2 の存在を知り、ずっと気になってました。

今回の結果を踏まえて、当時試作した rweb (tokio) と tide (async-std) のプロジェクトを修正してみましたが、deadpool-r2d2 / r2d2_odbc_api で問題なく動作しています。

....

なお、deadpool はアダプタが豊富なので、PostgreSQL, Sqlite 等については専用クレート、MySQL は deadpool-r2d2 / r2d2_mysql の組み合わせ等があります。

目的の DBMS が単一の場合、専用クレートや専用ライブラリ・ツールに優位性があるので、積極的に ODBC ドライバを利用する機会は少ないかもしれません。

それでも、非同期タスクで異なる DBMS を扱える deadpool-r2d2 / r2d2_odbc_api / ODBC ドライバの組み合わせは便利だと思います。