sunabox

Goのchを使ってリクエストごとのDB書き込み処理をバッチ処理にする

前回の記事で、非同期でキャッシュを定期的に更新し続けるという手法について紹介しました。

非同期バッチ処理によるキャッシュ更新戦略 | sunaboxリクエスト数が増えてもオリジンのDBに負荷をかけない、非同期バッチ処理によるキャッシュの更新戦略について
faviconsuna.dev

goroutineを使って別プロセスで定期的に何らかの処理を実行というのはわりと汎用性のある手法だなと思います。
今回はその手法を応用して、リクエストごとのDynamoDBへの書き込み処理をバッチ処理にした方法について紹介します。

chなどもうまく使えてGoらしい方法で目的を達成できたのではないかと思います。

背景

今回対象としたのは広告配信における予算消化金額をDynamoDBに書き込み処理する部分です。
リクエストから金額の情報を取り出し、まだ設定した予算を超えていなければDynamoDBにADDで加算処理をします。
以下が、そのチェックと加算処理のメソッドです

Go Icon
budget.go
func (b budgetService) consume(ctx context.Context, price float64) error {
	// 消化した予算が設定した予算を超えているかどうかチェック
 
	// 超えていないければDynamoDBへの消化予算の加算処理
	b.writeBudgetConsumption(ctx, price)
}

つまり、DynamoDBのテーブルでは現在消化している予算の金額の累計値が管理されています。
これまではリクエストごとに書き込み処理をしていたため、リクエスト数 = DynamoDBの書き込み回数となっていました

そこで今回は、加算した金額を一定期間ごとにまとめて書き込む処理に変更します

このサーバーでは主に書き込み処理をするだけですが、別のサーバーでこの累計値を読み取るロジックが存在します
ユースケースによりますが、今回の場合読み込み側もそこまでリアルタイム性が求められるものでもないので、加算された金額の情報が一定期間遅れる分は許容としています。

変更後の書き込み処理

リクエストごとの金額をどうまとめて書き込みするかですが、今回は金額の累計値を変数で管理しておいて、一定期間ごとにその累計値を書き込み処理します。

書き込み処理自体はリクエストとは別goroutineで行うようにしたいです

そこで、Goのchを使ってこれらのやり取りを行います。具体的には、以下のように責務が分かれます

リクエスト時: chで金額を送るのみ
別goroutine: chから金額を受け取って変数に加算、一定期間ごとに書き込み処理
一つずつ見ていきます

リクエスト時の処理

書き込み処理をしていた部分をレシーバーに設定しておいたbudgetChに対して送信するだけにしています
(本来はpriceをそのまま送っているのではなく、必要な情報を格納した構造体を送っていますが便宜上省略します)

Go Icon
budget.go
func (b budgetService) consume(ctx context.Context, price float64) error {
	// 消化した予算が設定した予算を超えているかどうかチェック
 
	// 超えていないければchに消化する金額を送信
	b.budgetCh <- price
}

DynamoDBに書き込み処理を行っていた部分がchに対して送信するだけになりました

別goroutineでの処理

リクエストとは関係ないgoroutineで、いわゆるfor-selectパターンを使ってchから受信した時の処理を行なっています。
chから受信した金額をsumに加算処理しています。
一方でtime.Tickerを使って一定期間ごとにsumを書き込み処理するようにしました。
書き込んだ後はsumを初期化しています。

Go Icon
budget.go
func (b budgetService) WritePeriodically(
	ctx context.Context,
	interval time.Duration
) {
	ticker := time.NewTicker(interval)
	sum := 0.0
 
	go func() {
		for {
			select {
			case price := <-b.budgetCh:
				// 加算処理
				sum += price
			case <-ticker.C:
				// DynamoDBへの書き込み処理
				if err := b.writeBudgetConsumption(ctx, price); err != nil {
					logger.Log.Errorfx(ctx, "failed to writeBudgetConsumption. err: %v", err)
				}
				// 累積値を初期化
				sum = 0.0
		}
	}()
}

intervalは引数で受け取れるようにしてあります。今回は1minに設定しました。
これでリクエスト数とは関係なく1minごとに書き込み処理が行われるので、DynamoDBへの書き込み回数が大幅に削減されるようになりました。

shutdown時を考慮した処理

しかし、ここで新たな問題が浮上します。
intervalで書き込み処理をするまでの間に、サーバーのスケールインやdeployなどでタスクが削除されたときは加算処理したものが書き込まれません。
そのため、shutdown処理が行われた時にはintervalを待たずに書き込み処理を行うようにしたいです。

ここでは、これを実現するために2つのchを引数で受け取るようにします。
1つはプロセス終了のシグナルを受け取った時に通知される受信専用ch(sigCh)で、もう1つは書き込み処理完了時にそれを通知するための送信専用ch(doneCh)です。

これらを使って先ほどのcase処理に「sigChからイベントを受け取った場合にDynamoDBへの書き込み処理を行ってdoneChに通知する」という処理を追加します。
(doneChの意味は後述します)

Go Icon
budget.go
func (b budgetService) WritePeriodically(
	ctx context.Context,
	interval time.Duration,
	sigCh <-chan struct{}, // shutdown処理を行なったときに通知される受信専用ch
	doneCh chan<- struct{} // 書き込み処理が完了した後に書き込む送信専用ch
) {
	ticker := time.NewTicker(interval)
	sum := 0.0
 
	go func() {
		for {
			select {
			case price := <-b.budgetCh:
				// 加算処理
				sum += price
			case <-ticker.C:
				// DynamoDBへの書き込み処理
				if err := b.writeBudgetConsumption(ctx, price); err != nil {
					logger.Log.Errorfx(ctx, "failed to writeBudgetConsumption. err: %v", err)
				}
				// 累積値を初期化
				sum = 0.0
			case <-sigCh:
				// DynamoDBへの書き込み処理
				if err := b.writeBudgetConsumption(ctx, price); err != nil {
					logger.Log.Errorfx(ctx, "failed to writeBudgetConsumption. err: %v", err)
				}
				ticker.Stop()
				doneCh <- struct{}{}
				return
		}
	}()
}

shutdown処理を行う側の処理は以下のようにしています。
Goの*http.Server.Shutdownはgraceful shutdownなので、処理中のリクエストが完了した後にsigChに対して書き込みを行ないます。(先ほど見たようにこれによってDynamoDBへの書き込み処理が行われます。)
DynamoDBへの書き込みが完了するまではプロセスを終わらせたくないので、doneChからの受信を同期的に待つことでプロセス終了処理をブロックしています。

Go Icon
main.go
if err := srv.Shutdown(tctx); err != nil {
	logger.Log.Error(err.Error())
}
 
sigCh <- struct{}{}
<-doneCh // 書き込み処理が完了するまで後続処理をブロックする
 
return 0 // 0が返されることでos.Exit(0)でプロセスが終えられる。

最終的にDynamoDBへの書き込み処理を行うのはintervalの間隔とshutdown時に限定され、リクエストごとに書き込み処理することは無くなりました。

ちなみにこの機構に変更したことによってレスポンス速度も30msくらい削減でき、DynamoDBへの書き込み回数が減ったので月数百ドル程度のコスト削減にもなりました。
今後スケールしてリクエスト数が増えても対応できる機構になったこともポイントです。

テストの書き方

変更後の処理ではリクエストごとに書き込みを行なっていた処理がinterval間隔およびshutdown時に行われるようになりました。

そうするとconsumeメソッドのテストのassertionもinterval期間だけsleepする必要があるのかと思われそうですが、そうではありません。
最後に見たようにsigChを使えばGoらしい書き方で記述することができます。

Go Icon
budget_test.go
sigCh := make(chan struct{})
doneCh := make(chan struct{})
 
// テストしたいメソッドとは関係ないが、書き込み処理を行うために実行
WritePeriodically(ctx, 1*time.Nanosecond, sigCh, doneCh)
 
// テストしたいメソッドの実行
err := consume(ctx, tt.args.price)
 
// 書き込みを即座に実行させる
sigCh <- struct{}{}
<-doneCh
 
// 以降で、書き込まれた金額が意図通りかをassertionする

intervalは1ナノ秒に設定してますが、それでもsigChを使わないと書き込みがされずflakyなテストになっていたのでちゃんと即座に書き込み処理をするのがよいです。

まとめ

今回はリクエストごとに行なっていた書き込み処理をgoroutineを使って定期的なバッチ処理にしました。

似たような事はたとえばSQSとLambdaを組み合わせても実現できると思いますが、管理しなければならないインフラリソースが増えるのに比べて今回は一つのサーバー内で完結させられたので個人的には今回の方法の方が好みです。

記述に関してもfor-selectパターンにchを組み込んで、やりとりされるイベントベースで処理を記述することができ、Goらしい書き方で実現できたのではないかと思ってます。

Buy Me A Coffeeのbutton

目次