モチベーション

pubsub トリガーの cloud function は非同期にリクエストを投げるため、同期的な API と比べ自動テストを行なうのが困難です(成功失敗のレスポンスが帰ってこないため)。 また、今回僕が作成した function は特に gcp 上でテストを行ないたいものだったため、この問題を真剣に考える必要がありました。

pubsub のリクエストの確認

まず、テストを導入するにあたって、テストで topic を push するさいの方法を確認します。 以下のような形で topic の push をします。

$ gcloud pubsub topics publish sample-topic
messageIds:
- '2134505983662900'

上記のようにレスポンスで返ってくるのは message の ID のみです。 この情報からどのように cloud function の成功失敗を取ってくるかが肝になります。

cloud function に渡される情報

次に cloud function を見ていきます。cloud function で実行される関数は以下のようなものです。


type pubSubMessage struct {
	Data       []byte `json:"data"`
}

func ExampleFunction(ctx context.Context, m pubSubMessage) error {
	// 処理...
	return nil
}

この ctx から metadata 関数を使用することで、EventID を取得することができます。 これが先程の messageIds の値と一致します。

import 	"cloud.google.com/go/functions/metadata"

// ...

meta, err := metadata.FromContext(ctx)
_ = meta.EventID

今回はこいつをログに仕込むことで、pubsub の push 元からエラーが起きたかどうか、分かるようにすることにしました。

log に EventID を仕込む

gcp では、cloud logging に適切に登録できれば、絞り込みを使用して log を特定することは容易です。 特にX-Cloud-Trace-Context が取れれば logging agent で簡単に cloud logging に登録ができたのですが、 pubsub 経由の cloud function からは取れなかったため、cloud.google.com/go/logging を使用した logging を作成しました(頑張れば取れた?)。 実際のコードは以下のような形です。

var baseLog *logging.Logger

type logger struct {
	traceID string
	spanID  string
	eventID string
	*logging.Logger
}

type Payload map[string]interface{}

func NewLogger(ctx context.Context) (*logger, *metadata.Metadata) {
	sctx := trace.FromContext(ctx).SpanContext()
	meta, err := metadata.FromContext(ctx)
	if err != nil {
		panic(err)
	}
	return &logger{
		traceID: sctx.TraceID.String(),
		eventID: meta.EventID,
		spanID:  sctx.SpanID.String(),
		Logger:  baseLog,
	}, meta
}

func (l *logger) Info(msg string, payload Payload) {
	payload["text"] = msg
	l.Log(logging.Info, payload)
}

func (l *logger) Warn(msg string, payload Payload) {
	payload["text"] = msg
	l.Log(logging.Warning, payload)
}

func (l *logger) Error(msg string, payload Payload) {
	payload["text"] = msg
	l.Log(logging.Error, payload)
}

func (l *logger) Panic(msg string, err error, payload Payload) {
	payload["text"] = msg
	l.Log(logging.Emergency, payload)
	panic(err)
}

func (l *logger) Log(level logging.Severity, payload Payload) {
	pc, file, line, _ := runtime.Caller(2)
	l.Logger.Log(logging.Entry{
		Severity: level,
		Payload:  payload,
		Labels: map[string]string{
			"event": l.eventID,
		},
		Resource: &monitoredres.MonitoredResource{
			Type: "cloud_function",
			Labels: map[string]string{
				"function_name": "ExampleFunction",
				"region":        "asia-northeast1",
				"project_id":    projectID,
			},
		},
		Trace:        "projects/" + projectID + "/traces/" + l.traceID,
		SpanID:       l.spanID,
		TraceSampled: true,
		SourceLocation: &logging2.LogEntrySourceLocation{
			File:     file,
			Line:     int64(line),
			Function: runtime.FuncForPC(pc).Name(),
		},
	})
}

後は以上のコードを logger として使用し、エラー発生時に適切なログを吐くようにします。

l.Info("start fuction", Payload{
	"event": meta.EventID,
})

l.Error("check exist", Payload{
	"error":   err.Error(),
	"eventID": meta.EventID,
})

特定のログを取得

関数側の準備もできたため、あとは以下のことを行なうスクリプトを書くのみです。 ログが取得できないときもエラーとしたかったため、エラーレベルでの絞りこみはなしにしました。

  1. pubsub に push
  2. 関数の実行が終わるまで待機
  3. messageids より実行ログを取得
  4. 実行ログが存在すること、error や warn がないことを確認

上記の流れのスクリプトは以下です。

#!/bin/bash

set -ux

project=$1

message_id=$(gcloud pubsub topics publish sample-topic \
       --attribute=[from="test-cli"] \
       --message='{"type":"test", "body": "テスト"}' \
       --project $1)
id=$(echo $message_id | sed -e "s/messageIds: - //g" | sed -e "s/'//g")

echo "sleeping..."
sleep 10

log=$(gcloud logging read labels.event="${id}")
if [ -z "${log}" ]; then
    echo "log is empty"
    exit 1;
fi

errors=$(echo $log | grep -E "severity: (WARNING|ERROR|EMERGENCY)")

if [ -z "${errors}" ]; then
    echo "ok"
    exit 0;
fi

echo "${errors} is happened"
exit 1

CI 実行

あとはこのスクリプトを CI で実行してあげる感じです。権限として以下の権限が必要でした。

  • Logs Viewer
  • Pub/Sub Publisher

まとめ

結合テストを行ないたかったので pubsub & cloud function の結合テストを自動化するのを頑張ってみました。 cloud logging を使う必要があるため、簡単に導入するには難しいですが、できると心理的安全があるので、やってよかったとは思います。

個人的には、cloud function の実行の終了を sleep で待っているのが微妙なので、何とかしたかったんですが、特に思いつかなかったです…😢