从 Apache Hive 迁移架构和数据

本文档介绍了如何将数据、安全设置和流水线从 Apache Hive 迁移到 BigQuery。

您还可以使用批量 SQL 转换来批量迁移 SQL 脚本,或使用交互式 SQL 转换来转换临时查询。这两种 SQL 转换服务完全支持 Apache HiveQL。

准备迁移

以下各部分介绍了如何收集表统计信息、元数据和安全设置的相关信息,以帮助您将数据仓库从 Hive 迁移到 BigQuery。

收集源表信息

收集有关来源 Hive 表的信息,例如其行数、列数、列数据类型、大小、数据的输入格式和位置。此信息可用于迁移过程,也可用于验证数据迁移。如果您在名为 corp 的数据库中有一个名为 employees 的 Hive 表,请使用以下命令收集表信息:

# Find the number of rows in the table
hive> SELECT COUNT(*) FROM corp.employees;

# Output all the columns and their data types
hive> DESCRIBE corp.employees;

# Output the input format and location of the table
hive> SHOW CREATE TABLE corp.employees;
Output:
…
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION
  'hdfs://demo_cluster/user/hive/warehouse/corp/employees'
TBLPROPERTIES (
…

# Get the total size of the table data in bytes
shell> hdfs dfs -du -s TABLE_LOCATION

源表格式转换

Hive 支持的某些格式无法直接提取到 BigQuery 中。

Hive 支持按以下格式存储数据:

  • 文本文件
  • RC 文件
  • 序列文件
  • Avro 文件
  • ORC 文件
  • Parquet 文件

BigQuery 支持从 Cloud Storage 加载采用以下任意一种文件格式的数据:

  • CSV
  • JSON(以换行符分隔)
  • Avro
  • ORC
  • Parquet

BigQuery 可以直接加载 Avro、ORC 和 Parquet 格式的数据文件,而无需架构文件。对于未格式化为 CSV 或 JSON(以换行符分隔)的文本文件,您可以将数据复制到 Avro 格式的 Hive 表,也可以将表架构转换为 BigQueryJSON 架构提取时提供。

收集 Hive 访问控制设置

Hive 和 BigQuery 具有不同的访问控制机制。收集所有 Hive 访问权限控制设置,例如角色、组、成员以及向其授予的权限。在 BigQuery 中按数据集级别绘制安全模型并实施细粒度 ACL。例如,可以将 Hive 用户映射到 Google 账号,并可以将 HDFS 组映射到 Google 组。可以在数据集级层设置访问权限。使用以下命令在 Hive 中收集访问权限控制设置:

# List all the users
> hdfs dfs -ls /user/ | cut -d/ -f3

# Show all the groups that a specific user belongs to
> hdfs groups user_name

# List all the roles
hive> SHOW ROLES;

# Show all the roles assigned to a specific group
hive> SHOW ROLE GRANT GROUP group_name

# Show all the grants for a specific role
hive> SHOW GRANT ROLE role_name;

# Show all the grants for a specific role on a specific object
hive> SHOW GRANT ROLE role_name on object_type object_name;

在 Hive 中,如果您拥有所需的权限,则可以直接访问表后面的 HDFS 文件。在标准 BigQuery 表中,将数据加载到表中后,数据会存储在 BigQuery 存储空间中。您可以使用 BigQuery Storage Read API 读取数据,但所有 IAM、行级和列级安全性仍会强制执行。如果您使用 BigQuery 外部表查询 Cloud Storage 中的数据,则对 Cloud Storage 的访问权限也由 IAM 控制。

您可以创建一个 BigLake 表,让您可以使用连接器通过 Apache Spark、Trino 或 Apache Hive 查询数据。BigQuery Storage API 会对 Cloud Storage 或 BigQuery 中的所有 BigLake 表强制执行行级和列级治理政策。

数据迁移

将 Hive 数据从本地或其他基于云的源集群迁移到 BigQuery 需要两个步骤:

  1. 将数据从源集群复制到 Cloud Storage
  2. 将数据从 Cloud Storage 加载到 BigQuery

以下部分介绍了如何迁移 Hive 数据、验证迁移的数据以及处理持续提取的数据的迁移。这些示例是为非 ACID 表编写。

分区列数据

在 Hive 中,分区表中的数据存储在目录结构中。表的每个分区都与分区列的特定值相关联。数据文件本身不包含任何分区列的数据。使用 SHOW PARTITIONS 命令列出分区表中的不同分区。

以下示例显示源 Hive 表按 joining_datedepartment 列进行分区。此表下的数据文件不包含与这两列相关的任何数据。

hive> SHOW PARTITIONS corp.employees_partitioned
joining_date="2018-10-01"/department="HR"
joining_date="2018-10-01"/department="Analyst"
joining_date="2018-11-01"/department="HR"

如需复制这些列,一种方法是将分区表转换为非分区表,然后再加载到 BigQuery 中:

  1. 创建一个架构与分区表类似的非分区表。
  2. 将数据从源分区表加载到非分区表中。
  3. 将暂存非分区表下的这些数据文件复制到 Cloud Storage。
  4. 使用 bq load 命令将数据加载到 BigQuery 中,并提供 TIMESTAMPDATE 类型分区列的名称(如果有的话)作为 time_partitioning_field 参数。

将数据复制到 Cloud Storage

数据迁移的第一步是将数据复制到 Cloud Storage。使用 Hadoop DistCp 将数据从本地或其他云集群复制到 Cloud Storage。将数据存储在与您希望在 BigQuery 中存储数据的数据集位于同一区域或多区域的存储桶中。例如,如果要使用现有 BigQuery 数据集作为东京区域中的目的地,则必须在东京选择 Cloud Storage 区域存储桶以保存数据。

选择 Cloud Storage 存储桶位置后,您可以使用以下命令列出 employees Hive 表位置中存在的所有数据文件:

> hdfs dfs -ls hdfs://demo_cluster/user/hive/warehouse/corp/employees
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000000_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000001_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000002_0

将上述所有文件复制到 Cloud Storage:

> hadoop distcp
hdfs://demo_cluster/user/hive/warehouse/corp/employees
gs://hive_data/corp/employees

请注意,您需要按照数据存储价格支付在 Cloud Storage 中存储数据的费用。

可能存在暂存目录,其中包含为查询作业创建的中间文件。您必须确保在运行 bq load 命令之前删除所有此类目录。

加载数据

BigQuery 支持从 Cloud Storage 以多种格式批量加载数据。在创建加载作业之前,确保要将数据加载到的 BigQuery 数据集已存在。

以下命令显示了从 Hive 为非 ACID 表复制的数据:

> gsutil ls gs://hive_data/corp/employees/
gs://hive-migration/corp/employees/
gs://hive-migration/corp/employees/000000_0
gs://hive-migration/corp/employees/000001_0
gs://hive-migration/corp/employees/000002_0

如需将 Hive 数据加载到 BigQuery,请使用 bq load 命令。您可以在网址中使用通配符 *,从共用对象前缀的多个文件加载数据。例如,使用以下命令加载共享前缀 gs://hive_data/corp/employees/ 的所有文件:

bq load --source_format=AVRO corp.employees gs://hive_data/corp/employees/*

由于作业可能需要很长时间才能完成,您可以通过将 --sync 标志设置为 False 来异步执行作业。运行 bq load 命令会输出已创建的加载作业的 ID,因此您可以使用此命令轮询作业状态。此类数据包括作业类型、作业状态和运行作业的用户等详细信息。

使用其各自的作业 ID 轮询每个加载作业的状态,并检查任何因错误而失败的作业。如果发生故障,BigQuery 会在将数据加载到表中时使用“全部或无”方法。您可以尝试解决错误,并安全地重新创建其他加载作业。如需了解详情,请参阅排查错误

确保每个表和项目具有足够的加载作业配额。如果超出配额,则加载作业将失败并显示 quotaExceeded 错误。

请注意,您无需为将数据从 Cloud Storage 加载到 BigQuery 的加载操作付费。数据加载到 BigQuery 后,系统即会按照 BigQuery 的存储价格收费。加载作业成功完成后,您可以删除 Cloud Storage 中的所有剩余文件,以避免因存储冗余数据而产生费用。

验证

成功加载数据后,您可以通过比较 Hive 中的行数和 BigQuery 表来验证迁移的数据。查看表信息以获取有关 BigQuery 表的详细信息,例如行数、列数、分区字段或集群字段。要进行其他验证,请考虑使用数据验证工具

持续提取

如果您持续将数据提取到 Hive 表中,请执行初始迁移,然后仅将增量数据更改迁移到 BigQuery。常见的做法是创建反复运行的脚本以查找和加载新数据。为此,可以采用多种方法,以下部分介绍了一种可能的方法。

您可以在 Cloud SQL 数据库表中跟踪迁移进度,此表在下面的部分中称为跟踪表。在首次运行迁移期间,请将进度存储在跟踪表中。对于后续的迁移运行,使用跟踪表信息来检测是否有任何其他数据已提取并且可以迁移到 BigQuery。

选择 INT64TIMESTAMPDATE 类型标识符列以区分增量数据。这称为增量列。

下表是一个没有分区的表的示例,此表使用 TIMESTAMP 类型作为其增量列:

+-----------------------------+-----------+-----------+-----------+-----------+
| timestamp_identifier        | column_2  | column_3  | column_4  | column_5  |
+-----------------------------+-----------+-----------+-----------+-----------+
| 2018-10-10 21\:56\:41       |           |           |           |           |
| 2018-10-11 03\:13\:25       |           |           |           |           |
| 2018-10-11 08\:25\:32       |           |           |           |           |
| 2018-10-12 05\:02\:16       |           |           |           |           |
| 2018-10-12 15\:21\:45       |           |           |           |           |
+-----------------------------+-----------+-----------+-----------+-----------+

下表是按 DATE 类型列 partition_column 分区的表示例。它在每个分区中都有一个整数类型增量列 int_identifier

+---------------------+---------------------+----------+----------+-----------+
| partition_column    | int_identifier      | column_3 | column_4 | column_5  |
+---------------------+---------------------+----------+----------+-----------+
| 2018-10-01          | 1                   |          |          |           |
| 2018-10-01          | 2                   |          |          |           |
| ...                 | ...                 |          |          |           |
| 2018-10-01          | 1000                |          |          |           |
| 2018-11-01          | 1                   |          |          |           |
| 2018-11-01          | 2                   |          |          |           |
| ...                 | ...                 |          |          |           |
| 2018-11-01          | 2000                |          |          |           |
+---------------------+---------------------+----------+----------+-----------+

以下部分介绍如何根据 Hive 是否进行分区以及是否具有增量列来迁移 Hive 数据。

没有增量列的非分区表

假设 Hive 中没有文件压缩,Hive 会在提取新数据时创建新的数据文件。在第一次运行期间,将文件列表存储在跟踪表中,并通过将这些文件复制到 Cloud Storage 且将它们加载到 BigQuery 中来完成 Hive 表的初始迁移。

> hdfs dfs -ls hdfs://demo_cluster/user/hive/warehouse/corp/employees
Found 3 items
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000000_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000001_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000002_0

初始迁移后,Hive 中会摄取一些数据。您只需将此增量数据迁移到 BigQuery。在后续迁移运行中,再次列出数据文件,并将其与跟踪表中的信息进行比较,以检测尚未迁移的新数据文件。

> hdfs dfs -ls hdfs://demo_cluster/user/hive/warehouse/corp/employees
Found 5 items
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000000_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000001_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000002_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000003_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000004_0

在此示例中,表位置存在两个新文件。如需迁移数据,请将这些新数据文件复制到 Cloud Storage,然后加载到现有 BigQuery 表中。

包含增量列的非分区表

在这种情况下,您可以使用增量列的最大值来确定是否添加了任何新数据。在执行初始迁移时,查询 Hive 表以获取增量列的最大值,并将其存储在跟踪表中:

hive> SELECT MAX(timestamp_identifier) FROM corp.employees;
2018-12-31 22:15:04

在后续的迁移过程中,再次重复相同的查询,获取增量列的当前最大值,并将其与跟踪表中先前的最大值进行比较,以检查是否存在增量数据:

hive> SELECT MAX(timestamp_identifier) FROM corp.employees;
2019-01-04 07:21:16

如果当前最大值大于前一个最大值,则表示增量数据已提取到 Hive 表中,如示例中所示。如需迁移增量数据,请创建暂存表并仅将增量数据加载到表中。

hive> CREATE TABLE stage_employees LIKE corp.employees;
hive> INSERT INTO TABLE stage_employees SELECT * FROM corp.employees WHERE timestamp_identifier>"2018-12-31 22:15:04" and timestamp_identifier<="2019-01-04 07:21:16"

如需迁移暂存表,请列出 HDFS 数据文件并将其复制到 Cloud Storage,然后加载到现有 BigQuery 表中。

没有增量列的分区表

将数据提取到分区表中可能会创建新分区和/或将增量数据附加到现有分区。在这种情况下,您可以识别这些更新后的分区,但无法轻松识别已添加到这些现有分区的数据,因为没有要区分的增量列。另一种方法是截取和维护 HDFS 快照,但快照会给 Hive 带来性能问题,因此通常会予以停用。

首次迁移表时,请运行 SHOW PARTITIONS 命令并将有关不同分区的信息存储在跟踪表中。

hive> SHOW PARTITIONS corp.employees
partition_column=2018-10-01
partition_column=2018-11-01

以上输出会显示表 employees 有两个分区。下面提供了跟踪表的简化版本,以显示如何存储此信息。

partition_information file_path gcs_copy_status gcs_file_path bq_job_id ...
partition_column =2018-10-01
partition_column =2018-11-01

在随后的迁移运行中,再次运行 SHOW PARTITIONS 命令列出所有分区,并将这些分区与跟踪表中的分区信息进行比较,以检查是否存在尚未迁移的新分区。

hive> SHOW PARTITIONS corp.employees
partition_column=2018-10-01
partition_column=2018-11-01
partition_column=2018-12-01
partition_column=2019-01-01

如果此示例中识别出任何新分区,请创建一个暂存表,并从源表中仅加载新分区。如需迁移暂存表,请将文件复制到 Cloud Storage 并将其加载到现有 BigQuery 表中。

使用增量列的分区表

在这种情况下,Hive 表会进行分区,每个分区中都有一个增量列。连续提取的数据会在此列值上递增。在这里,您可以迁移上一部分中所述的新分区,还可以迁移已提取到现有分区中的增量数据。

首次迁移表时,将每个分区中增量列的最小值和最大值与跟踪表中的表分区信息一起存储。

hive> SHOW PARTITIONS corp.employees
partition_column=2018-10-01
partition_column=2018-11-01

hive> SELECT MIN(int_identifier),MAX(int_identifier) FROM corp.employees WHERE partition_column="2018-10-01";
1 1000

hive> SELECT MIN(int_identifier),MAX(int_identifier) FROM corp.employees WHERE partition_column="2018-11-01";
1 2000

以上输出显示 employees 表有两个分区,以及每个分区中增量列的最小值和最大值。下面提供了跟踪表的简化版本,以显示如何存储此信息。

partition_information inc_col_min inc_col_max file_path gcs_copy_status ...
partition_column =2018-10-01 1 1000
partition_column =2018-11-01 1 2000

在后续运行中,运行相同的查询以获取每个分区中的当前最大值,并将其与跟踪表中的先前最大值进行比较。

hive> SHOW PARTITIONS corp.employees
partition_column=2018-10-01
partition_column=2018-11-01
partition_column=2018-12-01
partition_column=2019-01-01

hive> SELECT MIN(int_identifier),MAX(int_identifier) FROM corp.employees WHERE partition_column="2018-10-01";

在此示例中,确定了两个新分区,并且在现有分区 partition_column=2018-10-01 中提取了一些增量数据。如果存在任何增量数据,请创建暂存表,仅将增量数据加载到暂存表中,将数据复制到 Cloud Storage,然后将数据加载到现有的 BigQuery 表中。

安全设置

BigQuery 使用 IAM 来管理对资源的访问权限。BigQuery 预定义角色为特定服务提供精细访问权限,旨在支持常见的使用场景和访问权限控制模式。您可以使用自定义角色来自定义一组权限,从而提供更精细的访问权限。

和数据集的访问权限控制可指定允许用户、群组和服务账号对表、视图和数据集执行的操作。授权视图可让您与特定用户和群组共享查询结果,而无需为其授予底层源数据的访问权限。借助行级安全性列级安全性,您可以限制谁可以访问表中的哪些行或列。借助数据遮盖,您可以为用户群组选择性地遮盖列数据,但仍然允许他们访问该列。

应用访问权限控制时,您可以向以下用户和群组授予访问权限:

  • 用户(按电子邮件地址)(User by e-mail):向某个 Google 账号授予对数据集的访问权限
  • 群组(按电子邮件地址)(Group by e-mail):向某个 Google 群组的所有成员授予对数据集的访问权限
  • 网域:向某个 Google 网域中的所有用户和群组授予对数据集的访问权限
  • 所有经过身份验证的用户 (All Authenticated Users):向所有 Google 账号拥有者授予对数据集的访问权限(将数据集设为公开)
  • 项目所有者 (Project Owners):向所有项目所有者授予对数据集的访问权限
  • 项目查看者 (Project Viewers):向所有项目查看者授予对数据集的访问权限
  • 项目编辑者 (Project Editors):向所有项目编辑者授予对数据集的访问权限
  • 已获授权的视图:向视图授予对数据集的访问权限

数据流水线更改

以下部分讨论了从 Hive 迁移到 BigQuery 时如何更改数据流水线。

Sqoop

如果现有流水线使用 Sqoop 将数据导入 HDFS 或 Hive 进行处理,请修改作业以将数据导入 Cloud Storage。

如果要将数据导入 HDFS,请选择以下选项之一:

如果您希望 Sqoop 将数据导入 Google Cloud 上运行的 Hive,请将其直接指向 Hive 表,并将 Cloud Storage 用作 Hive 仓库(而不是 HDFS)。为此,请将属性 hive.metastore.warehouse.dir 设置为 Cloud Storage 存储桶。

您可以使用 Dataproc 提交 Sqoop 作业,以将数据导入 BigQuery,从而在不管理 Hadoop 集群的情况下运行 Sqoop 作业。

Spark SQL 和 HiveQL

批量 SQL 转换交互式 SQL 转换可以将 Spark SQL 或 HiveQL 自动转换为 GoogleSQL。

如果您不想将 Spark SQL 或 HiveQL 迁移到 BigQuery,则可以使用 Dataproc 或将 BigQuery 连接器与 Apache Spark 搭配使用

Hive ETL

如果 Hive 中有任何现有的 ETL 作业,您可以通过以下方式修改它们,以从 Hive 迁移这些作业:

  • 使用批量 SQL 转换器将 Hive ETL 作业转换为 BigQuery 作业。
  • 使用 Apache Spark 通过 BigQuery 连接器读取和写入 BigQuery。借助 Dataproc,您可以在临时集群的帮助下以经济实惠的方式运行 Spark 作业。
  • 使用 Apache Beam SDK 重写流水线,然后在 Dataflow 上运行。
  • 使用 Apache Beam SQL 重写流水线。

如需管理 ETL 流水线,您可以使用 Cloud Composer (Apache Airflow) 和 Dataproc 工作流模板。Cloud Composer 提供了一个工具,将 Oozie 工作流转换为 Cloud Composer 工作流。

Dataflow

如果要将 Hive ETL 流水线迁移到全代管式云服务,请考虑使用 Apache Beam SDK 编写数据流水线并在 Dataflow 上运行。

Dataflow 是一种用于执行数据处理流水线的代管式服务。它执行使用开源框架 Apache Beam 编写的程序。Apache Beam 是一种统一的编程模型,通过该模型,您可以同时开发批处理和流处理流水线。

如果您的数据流水线是标准数据移动,您可以使用 Dataflow 模板快速创建 Dataflow 流水线,而无需编写代码。您可以参考此 Google 提供的模板,使用该模板从 Cloud Storage 读取文本文件、应用转换以及将结果写入 BigQuery 表。

为了进一步简化数据处理,您还可以试用 Beam SQL,以便使用类似于 SQL 的语句来处理数据。