Cloud Storageのオブジェクト生成、削除などをトリガーにCloud Functionsを実行する

以前、「Cloud SchedulerでGoのCloud Functionsを定期的に実行する 」という記事で、Cloud SchedulerでCloud Functionsを定期実行する方法について書きました。その中で、Cloud Functionsの中で、ファイルをCloud Storageにアップロードするという処理を行いました。今回は、Cloud Storageのオブジェクト生成、削除などをトリガーに、Cloud Functionsを実行する方法を試してみたので纏めておきます。

概要

Cloud Storage内のイベントをトリガーにCloud Functionsを実行するには、PubSubを使います。(またかって感じです。) 簡単な図にすると、↓のような構成になります。

gcs-pubsub-gcf

上記を実現する手順は、

  • PubSubのトピックを作成する
  • Cloud Storageのバケットに対して、PubSubへメッセージをpublishできるよう設定する
  • Cloud Storageからのメッセージを処理するできるCloud Functionsを実装し、デプロイする

という流れになります。

PubSubのトピックの作成

PubSubのトピックの作成方法については、特に迷うこと無くできるので、割愛します。

Cloud Storageの設定

Cloud Storageのバケットに対して、PubSubへメッセージをpublishできるようにするには、gsutilコマンドを使う必要があります。(現状では、GCPのコンソールページからは、この設定はできないようです。)

具体的には、gsutilsが使える端末で、以下のコマンドを実行します。

gsutil notification create -t TOPIC_NAME -f json gs://BUCKET_NAME

TOPIC_NAMEには、メッセージの宛先となるPubSubのトピック名を設定します。また、BUCKET_NAMEには、PubSubへの通知を有効にしたいCloud Storageのバケット名を指定します。この設定や手順の公式ドキュメントは「Pub/Sub Notifications for Cloud Storage の使用法」にあります。

この設定を行うと、バケット内の種々のイベントがPubSubへpublishされます。 詳細は「Cloud Storage の Pub/Sub 通知」に書かれています。

Cloud Functionsの実装

ここでは、Go言語でCloud Storageからのメッセージを処理する方法を書いておきます。 Cloud SchedulerでGoのCloud Functionsを定期的に実行する でも書いたとおり、GoでPubSubのメッセージをトリガーとするCloud Functionsのシグネチャは、

package mypkg

import (
    "context"

    "cloud.google.com/go/pubsub"
)

func DoSomething(ctx context.Context, m *pubsub.Message) error {
    // Do Something
    return nil
}

のようになります。 この関数の引数である*pubsub.MessageAttributesフィールドに、Cloud Storageで発生したイベントの詳細が格納されており、これにアクセスすればmap[string]stringとして参照することができます。具体的に参照できる値としては、

  • イベント発生日時
  • イベント種別
  • バケット名
  • オブジェクト名

などがあります。詳細はここに記載があります。

イベント種別とバケット、オブジェクトがわかれば、イベント種別に応じていろいろなことができそうです。例えば、新しいオブジェクトが作成された場合に、そのオブジェクトを解析したり、その中身を別の形式で保存する、とか、オブジェクトが削除された場合に、それを何かしらに通知する、とか。

Cloud Storageで発生したイベントをSlackに通知するサンプル

試しに、Cloud Storageで発生したイベントをSlackに通知するサンプルを作ってみました。 コードは↓のようになりました。

package mypkg

import (
	"context"
	"fmt"
	"os"

	"cloud.google.com/go/pubsub"
	"github.com/slack-go/slack"
)

func DoSomething(ctx context.Context, m *pubsub.Message) error {
	data := m.Attributes
	ts := data["eventTime"]
	eventType := data["eventType"]
	bucket := data["bucketId"]
	object := data["objectId"]

	msg := fmt.Sprintf("[%s] eventType: %s, bucket: %s, object: %s", ts, eventType, bucket, object)
	return notify(msg)
}

func notify(msg string) error {
	token := os.Getenv("SLACK_TOKEN")
	channelID := os.Getenv("SLACK_CHANNEL_ID")
	api := slack.New(token)
	attachment := slack.Attachment{}
	_, _, err := api.PostMessage(
		channelID,
		slack.MsgOptionText(msg, false),
		slack.MsgOptionAttachments(attachment),
		slack.MsgOptionAsUser(true),
	)
	return err
}

終わりに

Cloud Storageのオブジェクト生成、削除などをトリガーにCloud Functionsを実行する方法について纏めました。Cloud SchedulerでGoのCloud Functionsを定期的に実行する と今回の内容を組み合わせて、引き続きゴニョゴニョやっていきます。