モチベーション
pubsub トリガーの cloud function は非同期にリクエストを投げるため、同期的な API と比べ自動テストを行なうのが困難です(成功失敗のレスポンスが帰ってこないため)。 また、今回僕が作成した function は特に gcp 上でテストを行ないたいものだったため、この問題を真剣に考える必要がありました。
pubsub のリクエストの確認
まず、テストを導入するにあたって、テストで topic を push するさいの方法を確認します。 以下のような形で topic の push をします。
$ gcloud pubsub topics publish sample-topic
messageIds:
- '2134505983662900'
上記のようにレスポンスで返ってくるのは message の ID のみです。 この情報からどのように cloud function の成功失敗を取ってくるかが肝になります。
cloud function に渡される情報
次に cloud function を見ていきます。cloud function で実行される関数は以下のようなものです。
type pubSubMessage struct {
Data []byte `json:"data"`
}
func ExampleFunction(ctx context.Context, m pubSubMessage) error {
// 処理...
return nil
}
この ctx から metadata 関数を使用することで、EventID を取得することができます。 これが先程の messageIds の値と一致します。
import "cloud.google.com/go/functions/metadata"
// ...
meta, err := metadata.FromContext(ctx)
_ = meta.EventID
今回はこいつをログに仕込むことで、pubsub の push 元からエラーが起きたかどうか、分かるようにすることにしました。
log に EventID を仕込む
gcp では、cloud logging に適切に登録できれば、絞り込みを使用して log を特定することは容易です。
特にX-Cloud-Trace-Context
が取れれば logging agent で簡単に cloud logging に登録ができたのですが、
pubsub 経由の cloud function からは取れなかったため、cloud.google.com/go/logging
を使用した logging を作成しました(頑張れば取れた?)。
実際のコードは以下のような形です。
var baseLog *logging.Logger
type logger struct {
traceID string
spanID string
eventID string
*logging.Logger
}
type Payload map[string]interface{}
func NewLogger(ctx context.Context) (*logger, *metadata.Metadata) {
sctx := trace.FromContext(ctx).SpanContext()
meta, err := metadata.FromContext(ctx)
if err != nil {
panic(err)
}
return &logger{
traceID: sctx.TraceID.String(),
eventID: meta.EventID,
spanID: sctx.SpanID.String(),
Logger: baseLog,
}, meta
}
func (l *logger) Info(msg string, payload Payload) {
payload["text"] = msg
l.Log(logging.Info, payload)
}
func (l *logger) Warn(msg string, payload Payload) {
payload["text"] = msg
l.Log(logging.Warning, payload)
}
func (l *logger) Error(msg string, payload Payload) {
payload["text"] = msg
l.Log(logging.Error, payload)
}
func (l *logger) Panic(msg string, err error, payload Payload) {
payload["text"] = msg
l.Log(logging.Emergency, payload)
panic(err)
}
func (l *logger) Log(level logging.Severity, payload Payload) {
pc, file, line, _ := runtime.Caller(2)
l.Logger.Log(logging.Entry{
Severity: level,
Payload: payload,
Labels: map[string]string{
"event": l.eventID,
},
Resource: &monitoredres.MonitoredResource{
Type: "cloud_function",
Labels: map[string]string{
"function_name": "ExampleFunction",
"region": "asia-northeast1",
"project_id": projectID,
},
},
Trace: "projects/" + projectID + "/traces/" + l.traceID,
SpanID: l.spanID,
TraceSampled: true,
SourceLocation: &logging2.LogEntrySourceLocation{
File: file,
Line: int64(line),
Function: runtime.FuncForPC(pc).Name(),
},
})
}
後は以上のコードを logger として使用し、エラー発生時に適切なログを吐くようにします。
l.Info("start fuction", Payload{
"event": meta.EventID,
})
l.Error("check exist", Payload{
"error": err.Error(),
"eventID": meta.EventID,
})
特定のログを取得
関数側の準備もできたため、あとは以下のことを行なうスクリプトを書くのみです。 ログが取得できないときもエラーとしたかったため、エラーレベルでの絞りこみはなしにしました。
- pubsub に push
- 関数の実行が終わるまで待機
- messageids より実行ログを取得
- 実行ログが存在すること、error や warn がないことを確認
上記の流れのスクリプトは以下です。
#!/bin/bash
set -ux
project=$1
message_id=$(gcloud pubsub topics publish sample-topic \
--attribute=[from="test-cli"] \
--message='{"type":"test", "body": "テスト"}' \
--project $1)
id=$(echo $message_id | sed -e "s/messageIds: - //g" | sed -e "s/'//g")
echo "sleeping..."
sleep 10
log=$(gcloud logging read labels.event="${id}")
if [ -z "${log}" ]; then
echo "log is empty"
exit 1;
fi
errors=$(echo $log | grep -E "severity: (WARNING|ERROR|EMERGENCY)")
if [ -z "${errors}" ]; then
echo "ok"
exit 0;
fi
echo "${errors} is happened"
exit 1
CI 実行
あとはこのスクリプトを CI で実行してあげる感じです。権限として以下の権限が必要でした。
- Logs Viewer
- Pub/Sub Publisher
まとめ
結合テストを行ないたかったので pubsub & cloud function の結合テストを自動化するのを頑張ってみました。 cloud logging を使う必要があるため、簡単に導入するには難しいですが、できると心理的安全があるので、やってよかったとは思います。
個人的には、cloud function の実行の終了を sleep で待っているのが微妙なので、何とかしたかったんですが、特に思いつかなかったです…😢