将 Vertex AI TensorBoard 与 Vertex AI Pipelines 搭配使用

您的训练代码可以打包到自定义训练组件中,并在流水线作业中运行。TensorBoard 日志会自动流式传输到 Vertex AI TensorBoard 实验。您可以使用此集成来近乎实时地监控训练,因为 Vertex AI TensorBoard 会在写入 Cloud Storage 时流式传输 Vertex AI TensorBoard 日志。

如需了解初始设置,请参阅为 Vertex AI TensorBoard 设置

对训练脚本的更改

您的训练脚本必须配置为将 TensorBoard 日志写入 Cloud Storage 存储桶,即 Vertex AI Training 服务通过预定义的环境变量 AIP_TENSORBOARD_LOG_DIR 自动提供的位置。

这通常可以通过将 os.environ['AIP_TENSORBOARD_LOG_DIR'] 作为日志目录提供给开源 TensorBoard 日志写入 API 来实现。 AIP_TENSORBOARD_LOG_DIR 的位置通常使用 staging_bucket 变量设置。

如需在 TensorFlow 2.x 中配置训练脚本,请创建 TensorBoard 回调并将 log_dir 变量设置为 os.environ['AIP_TENSORBOARD_LOG_DIR']。然后,TensorBoard 回调会添加到 TensorFlow model.fit 回调列表中。

  tensorboard_callback = tf.keras.callbacks.TensorBoard(
       log_dir=os.environ['AIP_TENSORBOARD_LOG_DIR'],
       histogram_freq=1
  )

  model.fit(
       x=x_train,
       y=y_train,
       epochs=epochs,
       validation_data=(x_test, y_test),
       callbacks=[tensorboard_callback],
  )
  

详细了解 Vertex AI 如何在自定义训练环境中设置环境变量。

构建并运行流水线

以下示例展示了如何使用 Kubeflow Pipelines DSL 软件包构建和运行流水线。如需查看更多示例和其他详细信息,请参阅 Vertex AI Pipelines 文档

创建训练组件

将训练代码打包到自定义组件中,确保代码已配置为将 TensorBoard 日志写入 Cloud Storage 存储桶。如需查看更多示例,请参阅构建自己的流水线组件

from kfp.v2.dsl import component

@component(
    base_image="tensorflow/tensorflow:latest",
    packages_to_install=["tensorflow_datasets"],
)
def train_tensorflow_model_with_tensorboard():
    import datetime, os
    import tensorflow as tf

    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    def create_model():
        return tf.keras.models.Sequential(
            [
                tf.keras.layers.Flatten(input_shape=(28, 28)),
                tf.keras.layers.Dense(512, activation="relu"),
            ]
        )

    model = create_model()
    model.compile(
        optimizer="adam",
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )

    tensorboard_callback = tf.keras.callbacks.TensorBoard(
        log_dir=os.environ['AIP_TENSORBOARD_LOG_DIR'],
        histogram_freq=1
    )

    model.fit(
        x=x_train,
        y=y_train,
        epochs=5,
        validation_data=(x_test, y_test),
        callbacks=[tensorboard_callback],
    )

构建和编译流水线

通过在 create_custom_training_job_op_from_component 中指定组件规范,从您创建的组件创建自定义训练作业。将 tensorboard_resource_name 设置为 TensorBoard 实例,并将 staging_bucket 设置为在 API 调用期间暂存工件的位置(包括 TensorBoard 日志)。

然后,构建流水线以包含此作业,并将流水线编译为 JSON 文件。

如需查看更多示例和信息,请参阅自定义作业组件构建流水线

from kfp.v2 import compiler
from google_cloud_pipeline_components.v1.custom_job.utils import \
    create_custom_training_job_op_from_component
from kfp.v2 import dsl

def create_tensorboard_pipeline_sample(
    project, location, staging_bucket, display_name, service_account, experiment, tensorboard_resource_name
):

    @dsl.pipeline(
        pipeline_root=f"{staging_bucket}/pipeline_root",
        name=display_name,
    )
    def pipeline():
        custom_job_op = create_custom_training_job_op_from_component(
            component_spec=train_tensorflow_model_with_tensorboard,
            tensorboard=tensorboard_resource_name,
            base_output_directory=staging_bucket,
            service_account=service_account,
        )
        custom_job_op(project=project, location=location)

    compiler.Compiler().compile(
        pipeline_func=pipeline, package_path=f"{display_name}.json"
    )

提交 Vertex AI 流水线

使用 Python 版 Vertex AI SDK 提交流水线。如需了解详情,请参阅运行流水线

Python

def log_pipeline_job_to_experiment_sample(
    experiment_name: str,
    pipeline_job_display_name: str,
    template_path: str,
    pipeline_root: str,
    project: str,
    location: str,
    parameter_values: Optional[Dict[str, Any]] = None,
):
    aiplatform.init(project=project, location=location)

    pipeline_job = aiplatform.PipelineJob(
        display_name=pipeline_job_display_name,
        template_path=template_path,
        pipeline_root=pipeline_root,
        parameter_values=parameter_values,
    )

    pipeline_job.submit(experiment=experiment_name)

  • experiment_name:提供实验的名称。
  • pipeline_job_display_name:流水线作业的显示名。
  • template_path:已编译的流水线模板的路径。
  • pipeline_root:指定流水线服务账号可以访问的 Cloud Storage URI。流水线运行的工件存储在流水线根目录中。
  • parameter_values:要传递给此运行的流水线参数。例如,创建一个 dict(),以参数名称为字典键,以参数值为字典值。
  • project:您的项目 ID。 要在其中运行流水线的 Google Cloud 项目。您可以在 Google Cloud 控制台的欢迎页面中找到您的 ID。
  • location:需要在其中运行流水线的区域。该区域应与您使用的 TensorBoard 实例相同。

后续步骤