Migrar esquemas e dados do Apache Hive

Neste documento, descrevemos como migrar dados, configurações de segurança e pipelines do Apache Hive para o BigQuery.

Também é possível usar tradução de SQL em lote para migrar os scripts SQL em massa ou a tradução de SQL interativo para traduzir consultas ad hoc. O Apache HiveQL é totalmente compatível com os dois serviços de tradução SQL.

Preparar para a migração

Nas seções a seguir, descrevemos como coletar informações sobre estatísticas de tabela, metadados e configurações de segurança para ajudar você a migrar o data warehouse do Hive para o BigQuery.

Coletar informações da tabela de origem

Reúna informações sobre tabelas de origem do Hive, como número de linhas, número de colunas, tipos de dados da coluna, tamanho, formato de entrada dos dados e local. Essas informações são úteis no processo de migração e também para validar a migração de dados. Se você tiver uma tabela do Hive chamada employees em um banco de dados chamado corp, use os seguintes comandos para coletar informações da tabela:

# Find the number of rows in the table
hive> SELECT COUNT(*) FROM corp.employees;

# Output all the columns and their data types
hive> DESCRIBE corp.employees;

# Output the input format and location of the table
hive> SHOW CREATE TABLE corp.employees;
Output:
…
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION
  'hdfs://demo_cluster/user/hive/warehouse/corp/employees'
TBLPROPERTIES (
…

# Get the total size of the table data in bytes
shell> hdfs dfs -du -s TABLE_LOCATION

Conversão de formato de tabela de origem

Alguns dos formatos compatíveis com o Hive não podem ser ingeridos diretamente no BigQuery.

O Hive é compatível com o armazenamento de dados nos seguintes formatos:

  • Arquivo de texto
  • Arquivo RC
  • Arquivo sequencial
  • Arquivo Avro
  • Arquivo ORC
  • Arquivo Parquet

O BigQuery é compatível com carregamento de dados do Cloud Storage em qualquer um dos seguintes formatos de arquivo:

  • CSV
  • JSON (delimitado por nova linha)
  • Avro
  • ORC
  • Parquet

O BigQuery pode carregar arquivos de dados nos formatos Avro, ORC e Parquet sem precisar de arquivos de esquema. Para arquivos de texto que não estão formatados como CSV ou JSON (delimitado por Newline), é possível copiar os dados para uma tabela do Hive no formato Avro ou converter o esquema da tabela em um Esquema JSON do BigQuery a ser fornecido ao ingerir.

Coletar configurações de controle de acesso do Hive

O Hive e o BigQuery têm diferentes mecanismos de controle de acesso. Coletar todas as configurações de controle de acesso do Hive, como papéis, grupos, membros e privilégios concedidos a eles. Mapear um modelo de segurança no BigQuery por nível de conjunto de dados e implementar uma ACL detalhada. Por exemplo, um usuário do Hive pode ser mapeado para uma Conta do Google e um grupo HDFS pode ser mapeado para um Grupo do Google. O acesso pode ser definido no nível do conjunto de dados. Use os seguintes comandos para coletar configurações de controle de acesso no Hive:

# List all the users
> hdfs dfs -ls /user/ | cut -d/ -f3

# Show all the groups that a specific user belongs to
> hdfs groups user_name

# List all the roles
hive> SHOW ROLES;

# Show all the roles assigned to a specific group
hive> SHOW ROLE GRANT GROUP group_name

# Show all the grants for a specific role
hive> SHOW GRANT ROLE role_name;

# Show all the grants for a specific role on a specific object
hive> SHOW GRANT ROLE role_name on object_type object_name;

No Hive, você poderá acessar os arquivos HDFS diretamente das tabelas se tiver as permissões necessárias. Nas tabelas padrão do BigQuery, depois que os dados são carregados na tabela, eles são armazenados no BigQuery. É possível ler dados usando a API BigQuery Storage Read, mas toda a segurança no nível do IAM, da linha e da coluna ainda é aplicada. Se você estiver usando tabelas externas do BigQuery para consultar os dados no Cloud Storage, o acesso ao Cloud Storage também será controlado pelo IAM.

Você pode criar uma Tabela BigLake que permitem usar conectores para consultar os dados com o Apache Spark, Trino ou Apache Hive. A API BigQuery Storage aplica políticas de governança no nível da linha e da coluna para todas as tabelas do BigLake no Cloud Storage ou no BigQuery.

Migração de dados

A migração de dados do Hive do seu cluster local ou de outro cluster de origem baseado na nuvem para o BigQuery tem duas etapas:

  1. Copiar dados de um cluster de origem para o Cloud Storage
  2. Importar dados do Cloud Storage para o BigQuery

As seções a seguir abrangem a migração de dados do Hive, a validação de dados migrados e o tratamento da migração de dados ingeridos continuamente. Os exemplos são escritos para tabelas não ACID.

Dados da coluna de partição

No Hive, os dados em tabelas particionadas são armazenados em uma estrutura de diretórios. Cada partição da tabela está associada a um determinado valor da coluna de partição. Os próprios arquivos de dados não contêm dados das colunas de partição. Use o comando SHOW PARTITIONS para listar as diferentes partições em uma tabela particionada.

O exemplo abaixo mostra que a tabela de origem do Hive é particionada nas colunas joining_date e department. Os arquivos de dados desta tabela não contêm dados relacionados a essas duas colunas.

hive> SHOW PARTITIONS corp.employees_partitioned
joining_date="2018-10-01"/department="HR"
joining_date="2018-10-01"/department="Analyst"
joining_date="2018-11-01"/department="HR"

Uma maneira de copiar essas colunas é converter a tabela particionada em uma tabela não particionada antes de carregar no BigQuery:

  1. Crie uma tabela não particionada com esquema semelhante à tabela particionada.
  2. Carregue dados na tabela não particionada da tabela particionada de origem.
  3. Copie esses arquivos de dados na tabela não particionada para o Cloud Storage.
  4. Carregue os dados no BigQuery com o comando bq load e forneça o nome da coluna de partição do tipo TIMESTAMP ou DATE, se houver, como o argumento time_partitioning_field.

Copiar dados para o Cloud Storage

A primeira etapa na migração de dados é copiar os dados para o Cloud Storage. Use o Hadoop DistCp para copiar dados do seu cluster local ou de outra nuvem para o Cloud Storage. Armazene seus dados em um bucket na mesma região ou multirregião que o conjunto de dados em que você quer armazenar os dados no BigQuery. Por exemplo, se você quiser usar um conjunto de dados existente do BigQuery como destino que está na região de Tóquio, precisará escolher um bucket regional do Cloud Storage em Tóquio para armazenar os dados.

Depois de selecionar o local do bucket do Cloud Storage, use o seguinte comando para listar todos os arquivos de dados presentes no local da tabela do employees Hive:

> hdfs dfs -ls hdfs://demo_cluster/user/hive/warehouse/corp/employees
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000000_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000001_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000002_0

Copie todos os arquivos acima para o Cloud Storage:

> hadoop distcp
hdfs://demo_cluster/user/hive/warehouse/corp/employees
gs://hive_data/corp/employees

Você é cobrado pelo armazenamento dos dados no Cloud Storage de acordo com os Preços de armazenamento de dados.

Pode haver diretórios de preparo que mantenham arquivos intermediários criados para jobs de consulta. Certifique-se de excluir esses diretórios antes de executar o comando bq load.

Como carregar os dados

O BigQuery oferece suporte a dados de carregamento em lote em muitos formatos do Cloud Storage. Verifique se o conjunto de dados do BigQuery em que você quer carregar os dados existe antes de criar um job de carregamento.

O comando a seguir mostra os dados copiados do Hive para uma tabela não ACID:

> gsutil ls gs://hive_data/corp/employees/
gs://hive-migration/corp/employees/
gs://hive-migration/corp/employees/000000_0
gs://hive-migration/corp/employees/000001_0
gs://hive-migration/corp/employees/000002_0

Para carregar seus dados do Hive no BigQuery, use o comando bq load. É possível usar um caractere curinga * no URL para carregar dados de vários arquivos que compartilham um prefixo de objeto comum. Por exemplo, use o seguinte comando para carregar todos os arquivos que compartilham o prefixo gs://hive_data/corp/employees/:

bq load --source_format=AVRO corp.employees gs://hive_data/corp/employees/*

Como os jobs podem levar muito tempo para serem concluídos, você pode executá-los de maneira assíncrona definindo a sinalização --sync como False. A execução do comando bq load gera o ID do job de carregamento criado. Portanto, use esse comando para pesquisar o status do job. Esses dados incluem detalhes como o tipo, o estado e o usuário que executou o job.

Pesquise o status de cada job de carregamento usando o respectivo ID do job e verifique se há algum job com falha. Em caso de falha, o BigQuery usa uma abordagem "Todos ou nenhum" ao carregar dados em uma tabela. Você pode tentar resolver os erros e recriar outro job de carregamento com segurança. Para mais informações, consulte Como resolver erros.

Verifique se você tem uma cota de job de carregamento suficiente por tabela e projeto. Se você exceder sua cota, o job de carregamento falhará com um erro quotaExceeded.

Não há cobranças por uma operação de carregamento para carregar dados no BigQuery do Cloud Storage. Após o carregamento dos dados no BigQuery, eles estarão sujeitos aos preços de armazenamento do BigQuery. Quando os jobs de carregamento forem concluídos com sucesso, exclua todos os arquivos restantes no Cloud Storage para evitar cobranças pelo armazenamento de dados redundantes.

Validação

Após o carregamento dos dados, é possível validar os dados migrados comparando o número de linhas nas tabelas do Hive e tabelas do BigQuery. Veja as informações da tabela para saber mais sobre tabelas do BigQuery, como número de linhas, de colunas, de campos de particionamento ou de clustering. Para fazer outras validações, use a ferramenta de validação de dados.

Ingestão contínua

Se você ingerir dados continuamente em uma tabela do Hive, faça uma migração inicial e depois migre apenas as alterações de dados incrementais para o BigQuery. É comum criar scripts executados repetidamente para encontrar e carregar novos dados. Há muitas maneiras de fazer isso, e as seções a seguir descrevem uma abordagem possível.

É possível acompanhar o progresso da migração em uma tabela de banco de dados do Cloud SQL, chamada de tabela de rastreamento nas seções a seguir. Durante a primeira execução da migração, armazene o progresso na tabela de rastreamento. Nas execuções de migração subsequentes, use as informações da tabela de rastreamento para detectar se algum outro dado foi ingerido e se pode ser migrado para o BigQuery.

Selecione uma coluna do identificador de tipo INT64, TIMESTAMP ou DATE para diferenciar os dados incrementais. Essa é uma coluna incremental.

A tabela a seguir é o exemplo de uma tabela sem particionamento que usa um tipo de TIMESTAMP para sua coluna incremental:

+-----------------------------+-----------+-----------+-----------+-----------+
| timestamp_identifier        | column_2  | column_3  | column_4  | column_5  |
+-----------------------------+-----------+-----------+-----------+-----------+
| 2018-10-10 21\:56\:41       |           |           |           |           |
| 2018-10-11 03\:13\:25       |           |           |           |           |
| 2018-10-11 08\:25\:32       |           |           |           |           |
| 2018-10-12 05\:02\:16       |           |           |           |           |
| 2018-10-12 15\:21\:45       |           |           |           |           |
+-----------------------------+-----------+-----------+-----------+-----------+

A tabela a seguir é exemplo de tabela particionada em uma coluna do tipo DATE partition_column. Ele tem uma coluna incremental de tipo inteiro int_identifier em cada partição.

+---------------------+---------------------+----------+----------+-----------+
| partition_column    | int_identifier      | column_3 | column_4 | column_5  |
+---------------------+---------------------+----------+----------+-----------+
| 2018-10-01          | 1                   |          |          |           |
| 2018-10-01          | 2                   |          |          |           |
| ...                 | ...                 |          |          |           |
| 2018-10-01          | 1000                |          |          |           |
| 2018-11-01          | 1                   |          |          |           |
| 2018-11-01          | 2                   |          |          |           |
| ...                 | ...                 |          |          |           |
| 2018-11-01          | 2000                |          |          |           |
+---------------------+---------------------+----------+----------+-----------+

As seções a seguir descrevem a migração de dados do Hive com base no fato de eles serem particionados ou não e se há ou não colunas incrementais.

Tabela não particionada sem colunas incrementais

Supondo que não haja compactações de arquivos no Hive, o Hive cria novos arquivos de dados ao ingerir novos dados. Durante a primeira execução, armazene a lista de arquivos na tabela de rastreamento e conclua a migração inicial da tabela do Hive copiando esses arquivos para o Cloud Storage e carregando-os no BigQuery.

> hdfs dfs -ls hdfs://demo_cluster/user/hive/warehouse/corp/employees
Found 3 items
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000000_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000001_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000002_0

Após a migração inicial, alguns dados são ingeridos no Hive. Você só precisa migrar esses dados incrementais para o BigQuery. Nas execuções de migração subsequentes, liste os arquivos de dados novamente e compare-os com as informações da tabela de acompanhamento para detectar novos arquivos de dados que não foram migrados.

> hdfs dfs -ls hdfs://demo_cluster/user/hive/warehouse/corp/employees
Found 5 items
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000000_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000001_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000002_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000003_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000004_0

Nesse exemplo, há dois novos arquivos no local da tabela. Para migrar os dados, copie esses novos arquivos para o Cloud Storage e carregue-os na tabela atual do BigQuery.

Tabela não particionada com colunas incrementais

Nesse caso, é possível usar o valor máximo de colunas incrementais para determinar se algum dado novo foi adicionado. Durante a migração inicial, consulte a tabela do Hive para buscar o valor máximo da coluna incremental e armazená-la na tabela de rastreamento:

hive> SELECT MAX(timestamp_identifier) FROM corp.employees;
2018-12-31 22:15:04

Nas execuções de migração subsequentes, repita a mesma consulta para buscar o valor máximo atual da coluna incremental e compare-a com o valor máximo anterior da tabela de acompanhamento para verificar se há dados incrementais:

hive> SELECT MAX(timestamp_identifier) FROM corp.employees;
2019-01-04 07:21:16

Se o valor máximo atual for maior que o máximo anterior, isso indica que os dados incrementais foram ingeridos na tabela do Hive, como no exemplo. Para migrar os dados incrementais, crie uma tabela de preparo e carregue apenas os dados incrementais nela.

hive> CREATE TABLE stage_employees LIKE corp.employees;
hive> INSERT INTO TABLE stage_employees SELECT * FROM corp.employees WHERE timestamp_identifier>"2018-12-31 22:15:04" and timestamp_identifier<="2019-01-04 07:21:16"

Para migrar a tabela de preparo, liste os arquivos de dados HDFS, copie-os para o Cloud Storage e carregue-os na tabela atual do BigQuery.

Tabela particionada sem colunas incrementais

A ingestão de dados em uma tabela particionada pode criar novas partições, anexar dados incrementais a partições atuais ou fazer ambas. Nesse cenário, é possível identificar essas partições atualizadas, mas não é fácil identificar quais dados foram adicionados a essas partições atuais, já que não há uma coluna incremental para distinguir. Outra opção é criar e manter snapshots do HDFS, mas o snapshot cria preocupações de desempenho para o Hive, portanto, geralmente é desativado.

Ao migrar a tabela pela primeira vez, execute o comando SHOW PARTITIONS e armazene as informações sobre as diferentes partições na tabela de rastreamento.

hive> SHOW PARTITIONS corp.employees
partition_column=2018-10-01
partition_column=2018-11-01

A saída acima mostra que a tabela employees tem duas partições. Confira abaixo uma versão simplificada da tabela de acompanhamento que mostra como essas informações podem ser armazenadas.

partition_information file_path gcs_copy_status gcs_file_path bq_job_id ...
partition_column =2018-10-01
partition_column =2018-11-01

Nas migrações posteriores, execute o comando SHOW PARTITIONS novamente para listar todas as partições e compará-las com as informações da partição da tabela de rastreamento para verificar se há novas partições que não foram migradas.

hive> SHOW PARTITIONS corp.employees
partition_column=2018-10-01
partition_column=2018-11-01
partition_column=2018-12-01
partition_column=2019-01-01

Se alguma partição nova for identificada como no exemplo, crie uma tabela de preparo e carregue apenas as novas partições nela a partir da tabela de origem. Migre a tabela de preparo copiando os arquivos para o Cloud Storage e carregando-os na tabela existente do BigQuery.

Tabela particionada com colunas incrementais

Nesse cenário, a tabela do Hive é particionada e uma coluna incremental está presente em cada partição. Os dados ingeridos continuamente incrementam esse valor de coluna. Aqui é possível migrar as novas partições, conforme descrito na seção anterior, além de migrar os dados incrementais que foram ingeridos nas partições atuais.

Ao migrar a tabela pela primeira vez, armazene os valores mínimo e máximo da coluna incremental em cada partição com as informações sobre essas partições na tabela de rastreamento.

hive> SHOW PARTITIONS corp.employees
partition_column=2018-10-01
partition_column=2018-11-01

hive> SELECT MIN(int_identifier),MAX(int_identifier) FROM corp.employees WHERE partition_column="2018-10-01";
1 1000

hive> SELECT MIN(int_identifier),MAX(int_identifier) FROM corp.employees WHERE partition_column="2018-11-01";
1 2000

A saída acima mostra que os funcionários da tabela têm duas partições e os valores mínimo e máximo da coluna incremental em cada partição. Confira abaixo uma versão simplificada da tabela de acompanhamento que mostra como essas informações podem ser armazenadas.

partition_information inc_col_min inc_col_max file_path gcs_copy_status ...
partition_column =2018-10-01 1 1000
partition_column =2018-11-01 1 2000

Nas execuções seguintes, execute as mesmas consultas para buscar o valor máximo atual em cada partição e compare-o com o valor máximo anterior da tabela de rastreamento.

hive> SHOW PARTITIONS corp.employees
partition_column=2018-10-01
partition_column=2018-11-01
partition_column=2018-12-01
partition_column=2019-01-01

hive> SELECT MIN(int_identifier),MAX(int_identifier) FROM corp.employees WHERE partition_column="2018-10-01";

No exemplo, duas novas partições foram identificadas e alguns dados incrementais foram ingeridos na partição atual partition_column=2018-10-01. Caso haja dados incrementais, crie uma tabela de preparo, carregue apenas os dados incrementais na tabela de preparo, copie os dados para o Cloud Storage e carregue os dados na tabela existente do BigQuery.

Configurações de segurança

O BigQuery usa o IAM para gerenciar o acesso aos recursos. Os papéis predefinidos do BigQuery oferecem acesso granular a um serviço específico e oferecem suporte a casos de uso comuns e padrões de controle de acesso. Você pode usar papéis personalizados para fornecer acesso ainda mais refinado personalizando um conjunto de permissões.

Os controles de acesso em tabelas e conjuntos de dados especificam as operações que usuários, grupos e contas de serviço podem executar em tabelas, visualizações e conjuntos de dados. As visualizações autorizadas permitem compartilhar resultados da consulta com usuários e grupos específicos sem que eles tenham acesso aos dados de origem subjacentes. Com a segurança no nível da linha e a segurança no nível da coluna, é possível restringir quem pode acessar quais linhas ou colunas em uma tabela. O mascaramento de dados permite ocultar seletivamente os dados de colunas de grupos de usuários, permitindo o acesso à coluna.

Ao aplicar controles de acesso, você pode conceder acesso aos seguintes usuários e grupos:

  • Usuário por e-mail: fornece acesso ao conjunto de dados a uma Conta do Google individual.
  • Grupo por e-mail: fornece a todos os membros de um grupo do Google acesso ao conjunto de dados.
  • Domínio: fornece acesso ao conjunto de dados a todos os usuários e grupos em um domínio do Google
  • Todos os usuários autenticados: fornece a todos os titulares de contas do Google acesso ao conjunto de dados, tornando-o público.
  • Proprietários do projeto: fornece acesso ao conjunto de dados a todos os proprietários do projeto
  • Visualizadores do projeto: fornece acesso ao conjunto de dados a todos os visualizadores do projeto
  • Editores do projeto: fornece acesso ao conjunto de dados a todos os editores do projeto
  • Visualização autorizada: fornece acesso de visualização ao conjunto de dados

Alterações no pipeline de dados

Veja nas seções a seguir como alterar os pipelines de dados ao migrar do Hive para o BigQuery.

Sqoop

Se o pipeline atual usar o Sqoop para importar dados para HDFS ou Hive para processamento, modifique o job para importar dados para o Cloud Storage.

Se você estiver importando dados para o HDFS, escolha uma das seguintes opções:

Se você quiser que a Sqoop importe dados para o Hive em execução no Google Cloud, aponte-a diretamente para a tabela do Hive e use o Cloud Storage como o warehouse do Hive em vez do HDFS. Para fazer isso, defina a propriedade hive.metastore.warehouse.dir como um bucket do Cloud Storage.

É possível executar seu job do Sqoop sem gerenciar um cluster do Hadoop usando o Dataproc para enviar jobs do Sqoop a fim de importar dados para o BigQuery.

Spark SQL e HiveQL

O tradutor de SQL em lote ou o tradutor de SQL interativo podem traduzir automaticamente o SQL do Spark ou o HiveQL para o GoogleSQL.

Se você não quiser migrar o Spark SQL ou HiveQL para o BigQuery, use o Dataproc ou o conector do BigQuery com o Apache Spark.

ETL no Hive

Se houver jobs de ETL no Hive, modifique-os das seguintes maneiras para migrá-los do Hive:

  • Converta o job de ETL do Hive em um job do BigQuery usando o tradutor de SQL em lote.
  • Use o Apache Spark para ler e gravar no BigQuery usando o conector do BigQuery. É possível usar o Dataproc para executar seus jobs do Spark de maneira econômica com a ajuda de clusters efêmeros.
  • Reescreva os pipelines usando o SDK do Apache Beam e execute-os no Dataflow.
  • Use o SQL do Apache Beam para reescrever seus pipelines.

Para gerenciar seu pipeline de ETL, use o Cloud Composer (Apache Airflow) e os modelos de fluxo de trabalho do Dataproc. O Cloud Composer fornece uma ferramenta para converter fluxos de trabalho do Oozie em fluxos de trabalho do Cloud Composer.

Dataflow

Se você quiser mover seu pipeline de ETL do Hive para serviços de nuvem totalmente gerenciados, considere gravar seus pipelines de dados usando o SDK do Apache Beam e executá-los no Dataflow.

O Dataflow é um serviço gerenciado para executar pipelines de processamento de dados. Ele executa programas escritos usando o framework de código aberto Apache Beam. O Apache Beam é um modelo de programação unificado que permite desenvolver canais de lote e streaming.

Se os pipelines de dados forem a movimentação de dados padrão, é possível usar modelos do Dataflow para criar rapidamente pipelines do Dataflow sem escrever código. Consulte este modelo fornecido pelo Google para ler arquivos de texto do Cloud Storage, aplicar transformações e gravar os resultados em uma tabela do BigQuery.

Para simplificar ainda mais o processamento de dados, teste também o SQL do Beam, que permite processar dados usando instruções semelhantes a SQL.