Vertex AI Pipelines で Vertex AI TensorBoard を使用する

トレーニング コードは、カスタム トレーニング コンポーネントにパッケージ化して、パイプライン ジョブで実行できます。TensorBoard のログは、Vertex AI TensorBoard テストに自動的にストリーミングされます。このインテグレーションを使用すると、Vertex AI TensorBoard ログが Cloud Storage に書き込まれる際に、そのログの Vertex AI TensorBoard ストリームとして、ほぼリアルタイムでトレーニングをモニタリングできます。

初期設定については、Vertex AI TensorBoard を設定するをご覧ください。

トレーニング スクリプトの変更点

トレーニング スクリプトは、TensorBoard ログを Cloud Storage バケットに書き込むように構成する必要があります。Cloud Strage バケットの場所は、Vertex AI Training サービスにより事前定義の環境変数 AIP_TENSORBOARD_LOG_DIR で自動的に利用可能にされます。

通常これは、オープンソースの TensorBoard ログ書き込み API に、ログ ディレクトリとして os.environ['AIP_TENSORBOARD_LOG_DIR'] を指定することで実現できます。AIP_TENSORBOARD_LOG_DIR の場所は、staging_bucket 変数で設定します。

TensorFlow 2.x でトレーニング スクリプトを構成するには、TensorBoard コールバックを作成して、log_dir 変数を os.environ['AIP_TENSORBOARD_LOG_DIR'] に設定します。これにより、TensorBoard コールバックが、TensorFlow model.fit コールバックのリストに含まれます。

  tensorboard_callback = tf.keras.callbacks.TensorBoard(
       log_dir=os.environ['AIP_TENSORBOARD_LOG_DIR'],
       histogram_freq=1
  )

  model.fit(
       x=x_train,
       y=y_train,
       epochs=epochs,
       validation_data=(x_test, y_test),
       callbacks=[tensorboard_callback],
  )
  

詳しくは、Vertex AI がカスタム トレーニング環境で環境変数を設定する方法をご覧ください。

パイプラインを構築して実行する

次の例では、Kubeflow Pipelines DSL パッケージでパイプラインを構築して実行する方法を示します。その他の例と詳細については、Vertex AI Pipelines のドキュメントをご覧ください。

トレーニング コンポーネントを作成する

トレーニング コードをカスタム コンポーネントにパッケージ化します。そのコードでは、TensorBoard ログを Cloud Storage バケットに書き込むように構成してください。他の例については、独自のパイプライン コンポーネントの構築をご覧ください。

from kfp.v2.dsl import component

@component(
    base_image="tensorflow/tensorflow:latest",
    packages_to_install=["tensorflow_datasets"],
)
def train_tensorflow_model_with_tensorboard():
    import datetime, os
    import tensorflow as tf

    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    def create_model():
        return tf.keras.models.Sequential(
            [
                tf.keras.layers.Flatten(input_shape=(28, 28)),
                tf.keras.layers.Dense(512, activation="relu"),
            ]
        )

    model = create_model()
    model.compile(
        optimizer="adam",
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )

    tensorboard_callback = tf.keras.callbacks.TensorBoard(
        log_dir=os.environ['AIP_TENSORBOARD_LOG_DIR'],
        histogram_freq=1
    )

    model.fit(
        x=x_train,
        y=y_train,
        epochs=5,
        validation_data=(x_test, y_test),
        callbacks=[tensorboard_callback],
    )

パイプラインを構築してコンパイルする

create_custom_training_job_op_from_component でコンポーネント仕様を指定し、作成したコンポーネントからカスタム トレーニング ジョブを作成します。tensorboard_resource_name を TensorBoard インスタンスに設定し、API 呼び出し中にアーティファクト(TensorBoard ログを含む)をステージングする場所に staging_bucket を設定します。

次に、このジョブを含めるパイプラインを構築し、そのパイプラインを JSON ファイルにコンパイルします。

他の例と詳細については、CustomJob コンポーネントパイプラインの構築をご覧ください。

from kfp.v2 import compiler
from google_cloud_pipeline_components.v1.custom_job.utils import \
    create_custom_training_job_op_from_component
from kfp.v2 import dsl

def create_tensorboard_pipeline_sample(
    project, location, staging_bucket, display_name, service_account, experiment, tensorboard_resource_name
):

    @dsl.pipeline(
        pipeline_root=f"{staging_bucket}/pipeline_root",
        name=display_name,
    )
    def pipeline():
        custom_job_op = create_custom_training_job_op_from_component(
            component_spec=train_tensorflow_model_with_tensorboard,
            tensorboard=tensorboard_resource_name,
            base_output_directory=staging_bucket,
            service_account=service_account,
        )
        custom_job_op(project=project, location=location)

    compiler.Compiler().compile(
        pipeline_func=pipeline, package_path=f"{display_name}.json"
    )

Vertex AI パイプラインを送信する

パイプラインは、Vertex AI SDK for Python を使用して送信します。詳細については、パイプラインを実行するをご覧ください。

Python

def log_pipeline_job_to_experiment_sample(
    experiment_name: str,
    pipeline_job_display_name: str,
    template_path: str,
    pipeline_root: str,
    project: str,
    location: str,
    parameter_values: Optional[Dict[str, Any]] = None,
):
    aiplatform.init(project=project, location=location)

    pipeline_job = aiplatform.PipelineJob(
        display_name=pipeline_job_display_name,
        template_path=template_path,
        pipeline_root=pipeline_root,
        parameter_values=parameter_values,
    )

    pipeline_job.submit(experiment=experiment_name)

  • experiment_name: テストの名前を指定します。
  • pipeline_job_display_name: パイプライン ジョブの表示名。
  • template_path: コンパイルされたパイプライン テンプレートへのパス。
  • pipeline_root: パイプライン サービス アカウントがアクセスできる Cloud Storage URI を指定します。パイプライン実行のアーティファクトはパイプライン ルート内に保存されます。
  • parameter_values: (省略可)この実行に渡すパイプライン パラメータ。たとえば、パラメータ名をディクショナリ キー、パラメータ値をディクショナリ値として dict() を作成します。
  • project: 実際のプロジェクト ID。パイプラインを実行する Google Cloud プロジェクト。ID は、Google Cloud コンソールの [ようこそ] ページで確認できます。
  • location: パイプラインを実行するリージョン。これは、使用している TensorBoard インスタンスと同じリージョンにする必要があります。

次のステップ