以前、「Cloud SchedulerでGoのCloud Functionsを定期的に実行する 」という記事で、Cloud SchedulerでCloud Functionsを定期実行する方法について書きました。その中で、Cloud Functionsの中で、ファイルをCloud Storageにアップロードするという処理を行いました。今回は、Cloud Storageのオブジェクト生成、削除などをトリガーに、Cloud Functionsを実行する方法を試してみたので纏めておきます。
Cloud Storage内のイベントをトリガーにCloud Functionsを実行するには、PubSubを使います。(またかって感じです。) 簡単な図にすると、↓のような構成になります。
上記を実現する手順は、
という流れになります。
PubSubのトピックの作成方法については、特に迷うこと無くできるので、割愛します。
Cloud Storageのバケットに対して、PubSubへメッセージをpublishできるようにするには、gsutil
コマンドを使う必要があります。(現状では、GCPのコンソールページからは、この設定はできないようです。)
具体的には、gsutils
が使える端末で、以下のコマンドを実行します。
gsutil notification create -t TOPIC_NAME -f json gs://BUCKET_NAME
TOPIC_NAME
には、メッセージの宛先となるPubSubのトピック名を設定します。また、BUCKET_NAME
には、PubSubへの通知を有効にしたいCloud Storageのバケット名を指定します。この設定や手順の公式ドキュメントは「Pub/Sub Notifications for Cloud Storage の使用法」にあります。
この設定を行うと、バケット内の種々のイベントがPubSubへpublishされます。 詳細は「Cloud Storage の Pub/Sub 通知」に書かれています。
ここでは、Go言語でCloud Storageからのメッセージを処理する方法を書いておきます。 Cloud SchedulerでGoのCloud Functionsを定期的に実行する でも書いたとおり、GoでPubSubのメッセージをトリガーとするCloud Functionsのシグネチャは、
package mypkg
import (
"context"
"cloud.google.com/go/pubsub"
)
func DoSomething(ctx context.Context, m *pubsub.Message) error {
// Do Something
return nil
}
のようになります。
この関数の引数である*pubsub.MessageのAttributes
フィールドに、Cloud Storageで発生したイベントの詳細が格納されており、これにアクセスすればmap[string]string
として参照することができます。具体的に参照できる値としては、
などがあります。詳細はここに記載があります。
イベント種別とバケット、オブジェクトがわかれば、イベント種別に応じていろいろなことができそうです。例えば、新しいオブジェクトが作成された場合に、そのオブジェクトを解析したり、その中身を別の形式で保存する、とか、オブジェクトが削除された場合に、それを何かしらに通知する、とか。
試しに、Cloud Storageで発生したイベントをSlackに通知するサンプルを作ってみました。 コードは↓のようになりました。
package mypkg
import (
"context"
"fmt"
"os"
"cloud.google.com/go/pubsub"
"github.com/slack-go/slack"
)
func DoSomething(ctx context.Context, m *pubsub.Message) error {
data := m.Attributes
ts := data["eventTime"]
eventType := data["eventType"]
bucket := data["bucketId"]
object := data["objectId"]
msg := fmt.Sprintf("[%s] eventType: %s, bucket: %s, object: %s", ts, eventType, bucket, object)
return notify(msg)
}
func notify(msg string) error {
token := os.Getenv("SLACK_TOKEN")
channelID := os.Getenv("SLACK_CHANNEL_ID")
api := slack.New(token)
attachment := slack.Attachment{}
_, _, err := api.PostMessage(
channelID,
slack.MsgOptionText(msg, false),
slack.MsgOptionAttachments(attachment),
slack.MsgOptionAsUser(true),
)
return err
}
Cloud Storageのオブジェクト生成、削除などをトリガーにCloud Functionsを実行する方法について纏めました。Cloud SchedulerでGoのCloud Functionsを定期的に実行する と今回の内容を組み合わせて、引き続きゴニョゴニョやっていきます。