quinta-feira, 29 de agosto de 2019

Apache Spark no Python: PySpark com o Windows usando o Pycharm


Dando continuidade ao artigo: Ambiente para rodar o Spark com Scala na IDE do Eclipse (link: https://slothbigdata.blogspot.com/2019/07/ambiente-para-rodar-o-spark-com-scala.html), no qual preparamos o Eclipse para executar os processos do Spark no Windows utilizando a linguagem Scala.

Efetuaremos o mesmo processo nesse post, mas utilizando o Spark, programando em Python dentro da IDE do Pycharm. Assim será possível executar os seus próprios modelos, sem a necessidade de utilizar um cluster ou uma VM com a distribuição do Big Data.



1 – Instalação do Java

Primeiro passo é efetuar a instalação da JVM do Java para rodar os pacotes do Spark. No caso abaixo vamos aproveitar e instalar o pacote de desenvolvimento do Java (JDK).
Efetuar a instalação do JDK e definir as variáveis de ambiente no Windows.



Criar a variável do JAVA_HOME no diretório no qual a aplicação foi instalada.


 Editando a variável Path, insira o caminho do executável do Java.


%JAVA_HOME%\bin


Para teste executar o teste, efetue o comando no Prompt do Windows:


javac -version









2 – Instalação do Python

Para a instalação do Python vamos utilizar uma versão do Anconda, que já vem carregada de bibliotecas matemáticas.



Selecione a opção “Windows”, baixe o executável,  efetue a instalação com os passos requeridos.

Após a instalação acesse as variáveis de ambiente e insira o caminho do Python na variável Path do mesmo modo que foi configurado o Java acima.


Comum o ambiente do Anaconda está configurado nesse caminho:
C:\Users\meuusuario\AppData\Local\Continuum\anaconda3


Para o teste acesso o Prompt e execute o comando:


python –version









3 – Preparando o ambiente do Hadoop no Windows:

Revendo o artigo: Ambiente para rodar o Spark com Scala na IDE do Eclipse (link: https://slothbigdata.blogspot.com/2019/07/ambiente-para-rodar-o-spark-com-scala.html), devem executar o processo de implantação do WinUtils no Windows. O arquivo pode ser baixado no link: https://github.com/slothbigdata/SlothBigDataSparkLicao.


Ao baixar o arquivo e descompacta-lo, novamente devemos definir a variável de ambiente HADOOP_HOME, para o caminho do diretório.


Insira a nova variável no Path:

%HADOOP%\bin





4 – Instalação das bibliotecas do Spark.

Efetuar o download do arquivo:

Descompactar o arquivo no diretório C:



Visão do diretório:

Entrar no command line do Windows.
Acessar o diretório: C:\spark-2.4.3-bin-hadoop2.7\bin
Digitar: pyspark



Após a instalação conseguiremos utilizar o Spark pelo Prompt de Comando do Windows.





5 – Utilizando o desenvolvimento no Pycharm.

Agora configuraremos o Spark para rodar no PyCharm, para isso temos que configurar mais variáveis de ambiente. Agora vamos inserir o SPARK_HOME, com o caminho do pacote do Spark que baixamos acima.

C:\spark-2.4.3-bin-hadoop2.7


Incluir no Path: %SPARK_HOME%\bin



Para teste, execute o pyspark no prompt em qualquer local.



Abaixe a IDE do PyCharm no site: https://www.jetbrains.com/pycharm/

Após a instalação do Pycharm crie um novo projeto. Clicando na versão do Python, vamos selecionar Interpret Settings -> Project Structure





Clique em Add Content Root:

Adicione o diretório do Python:
C:\spark-2.4.3-bin-hadoop2.7\python

Adicione o py4j-0.9-src.zip no diretório:
C:\spark-2.4.3-bin-hadoop2.7\python\lib\ py4j-0.10.7-src.zip


Após essas configurações, reinicie o PyCharm e o Spark esta pronto para rodar na IDE.


Em caso de dúvidas ou sugestões, escreva nos comentários ou nos mande um email: slothbigdata@gmail.com.


quinta-feira, 8 de agosto de 2019

Arquitetura de dados em Big Data



Conforme experiências em projetos atuais, tenho encontrado muitos analistas que tem dificuldade em efetuar consultas nas bases de dados na estrutura do Big Data. A maioria das ingestões são feitas no Data Lake diariamente com apena um “Delta” das informações, que seriam atualizações ocorridas dos registros durante o período de um dia.

Nos projetos o Data Lake se tornar uma referência as fontes de origem, devido a documentação do dado ser compátivel ao sistema que originou a informação e a sua vizibilidade se torna maior devido ao conhecimento dos colaboradores dessa estrutura transacional, deste modo, o Data Lake acaba  sendo fonte para diversas consultas no modo dimensional.

Esse processo ao consultar o Datalake onera drasticamente o cluster, pois cada consulta aos dados brutos, a engine percorre toda a tabela ocorrendo o “full-scan”. Devemos lembrar que diferentemente de uma estrutura de um banco de dados relacional, o Hive por ser um sumarizador de dados, ele não tem os mesmos recursos de indexadores que um banco relacional, mas leva vantagem na paralelização dos processos.

Para esclarecer melhor esse cenário, vamos supor que um cientista de dados necessita montar um modelo no qual ele precisa consultar a cidade na tabela de clientes.


No caso de uma consulta pelo Data Lake, o processo deverá percorrer todas as partições e trazer o registro mais recente, no caso acima está mencionado um exemplo de dados com 6 partições, se utilizarmos a persistência de dados de 3 anos, são 1095 partições.
Caso em consultas no Data Lake o join mais performático é feito da seguinte maneira:

Seguindo os conceitos "top down" definido por  Bill Inmon, de amenizar esse processamento de toda tabela, seria a construção de um Data Mart no qual a tabela seria atualizada diariamente.



Diariamente seria executado um script no qual consultaria a nova partição criada no Data Lake e atualizaria seu Data Mart. Essa tabela geralmente não é particionada e sua performance referente a consulta, será bem menos onerosa ao cluster.



O modelo de query para executar o processo diário:

--INSERIR EM UMA TABELA A VISAO DO DIA ANTERIOR

INSERT INTO TABLE TABELA_UNIAO
SELECT ID, NOME, CIDADE, DATA_INGESTAO FROM DATAMART.CLIENTE;

--INSERIR O DIA DA NOVA PARTICAO DO DATALAKE
INSERT INTO TABLE TABELA_UNIAO
SELECT ID, NOME, CIDADE, DATA_INGESTAO 
FROM DATAMART.CLIENTE 
WHERE DATA_INGESTAO = "2019-01-06";

--CARREGAR O DATA MART COM OS DADOS MAIS RECENTES
INSERT OVERWRITE TABLE DATAMART.CLIENTE
SELECT
SUB_CLIENTE.ID,
SUB_CLIENTE.NOME,
SUB_CLIENTE.CIDADE,
SUB_CLIENTE.DATA_INGESTAO
FROM(
ID,
NOME,
CIDADE,
DATA_INGESTAO,
RANK() OVER (PARTITION BY ID ORDER BY DATA_INGESTAO DESC) RANKING
FROM DATALAKE.CLIENTE
) SUB_CLIENTE
WHERE SUB_CLIENTE.RANKING = 1;

Algumas orientações podem ser aplicadas:

  • Em Big Data a replicação dos dados é permitida se caso for utilizado para o aumento de performance na consulta, mas deve ser avalidado com ponderação.
  • Na instrução sql aplicada acima, utilizamos o RANK(), pois apresenta melhor performance na execução, mas caso possui uma duplicação de chaves (ID e DATA_INGESTAO), apresentará a quantidade de registros com o mesmo ranque (No caso acima multiplos registros com SUB_CLIENTE.RANKING = 1), para solucionar esse caso é possível utilizar a função ROW_NUMBER(). Fonte: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics 
  • Outra forma segura de carregar o Data Mart e diminuir a indisponibilidade é utilizar o processo Exchange Partition.                    Fonte: https://cwiki.apache.org/confluence/display/Hive/Exchange+Partition
O assunto de estrutura de dados é algo importante e deve ser discutida, pois em alguns casos o processo começa apresentar falhas após meses de operação sistema, no qual,  já está finalizado  o período de garantia oferecido pelo fornecedor.


Em caso de dúvidas ou sugestões, escreva nos comentários ou nos mande um email: slothbigdata@gmail.com.


sábado, 27 de julho de 2019

Fast Data Modulo 1: Instalação do Apache Kafka



Kafka é um sistema distribuidor, particionador e replicador de comits de serviços de logs. Ele fornece a funcionalidade de um sistema de mensagens, mas com um design único. O Kafka inscreve e publica registros em um fluxo de mensagens, armazenando esses fluxos de uma maneira durável e tolerante a falhas.



Pré-requisitos:
VM com o CENTOS 7 instalado:
Java 8 instalado na VM.



1    1. Baixar o Apache Kafka na VM, para isso digite a instrução: 

      2. Descompactar o arquivo com o seguinte comando:
    #tar -xzf kafka_2.11-2.2.0.tgz
    #mv kafka_2.11-2.2.0.tgz kafka
    #cd kafka

3. Para configurar o Kafka entre do diretório config da pasta Kafka.

               Copie o arquivo server.properties para um novo server-1.properties

               #cp server.properties server-1.properties
               #vi server-1.properties

               Defina as seguintes configurações:
·        Na propriedade broker.id, coloque o numero identifica o server do Kafka, como nós estamos fazendo em um único nó, vamos colocar o valor 1
·        Na propriedade listeners, vamos inserir o IPV4 da vm.
·        Na log.dirs, configure o diretório de log para o server do Kafka
·        Para a propriedade zookeeper.connect, mencione os ips do zookeeper, no nosso caso será o mesmo da máquina.

broker.id=1
listeners=PLAINTEXT://192.168.56.102:9092
log.dirs=/tmp/kafka-logs-1
zookeeper.connect=192.168.56.102:2181

      4. Inciar o serviço entre na pasta do kafka:

·        Zookeeper, cria em id na pasta tmp e inicie o serviço:

#mkdir /tmp/zookeeper/
#echo "1" > /tmp/zookeeper/myid
#nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

·        Kafka
#nohup bin/kafka-server-start.sh config/server-1.properties &

      5. Criar um tópico:

#bin/kafka-topics.sh --create --zookeeper 192.168.56.102:2181 --replication-factor 1  --partitions 1 --topic nomedotopico

 Listar todos os tópicos

#bin/kafka-topics.sh --list --zookeeper 192.168.56.102:2181


1    6. Testando um Producer e um Consumer:

      Crie duas sessões de acesso ao putty, uma será chamado o consumer:

      #bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.102:9092 --topic elastic --from-beginning
    
      Na outra será criado o produtor de mensagens
       
      #bin/kafka-console-producer.sh --broker-list 192.168.56.102:9092 --topic elastic

      Ao digitar a mensagem no sessão do producer a informação deverá aparecer no consumer.



Essa instalação do Kafka, porem de muito simples, vai ser utilizado como base para se trabalhar com diversas ferramentas de fast data.

Em caso de dúvidas ou sugestões, escreva nos comentários ou nos mande um email: slothbigdata@gmail.com.

quinta-feira, 25 de julho de 2019

Apache Spark com Scala: Manipulação de DataFrame Etapa1


O DataFrame é um coleção distribuida de dados, no qual oferece os beneficios  do RDD  (Resilient Distributed Dataset)  com uma tipificação forte, untilizando as funções Lambdas das liguagem, para a manipulação de registros executando funçoes de join, filter, sum, flatmap, etc.
O DataFrame esta incluso nas bibliotecas Spark SQL, oferecidas pelo Apache Spark e torna a visão dos dados de acordo a uma tabela relacional.

Para executar o exemplo abaixo, é necessário seguir o procedimento do artigo:

https://slothbigdata.blogspot.com/2019/07/ambiente-para-rodar-o-spark-com-scala.html

Para o exercício, será criado uma instância do SparkSession, que esta disponível nas versões do Spark Apache 2.0

package com.slothbigdata.licao

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._


object DataFrameFilterJoin {
 
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession.builder()
                             .appName("Spark Pi")
                             .master("local")
                             .getOrCreate()
  
   }

}

Instanciado o SparkSession definindo o nome e a engine de processamento, insira o arquivo cidadao.json em uma pasta, este arquivo foi baixado no repositório do git conforme o artigo mencionado acima.
Capture o arquivo json com o seguinte processo.

1 - Cria um StructType para definição do schema da tabela. O StructType é uma classe do Spark no qual efetua o mapeamento do Dataframe.

2 - Carga de dados do Json:


package com.slothbigdata.licao

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._


object DataFrameFilterJoin {
 
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession.builder()
                             .appName("Spark Pi")
                             .master("local")
                             .getOrCreate()
     import spark.implicits._
    
     //definindo schema
     val schemaCidadao    = StructType(
                              List(
                                StructField("nome",StringType,true),
                                StructField("idade",IntegerType,true),
                                StructField("cidade",StringType,true),
                                StructField("uf",StringType,true),
                                StructField("id_profissao",IntegerType,true)
                              )
                            )        
     //json
     val jsonRDD = spark.sparkContext                                                                             .wholeTextFiles("C:\\dados\\arquivo_cidadao\\cidadao.json")
                        .map(x => x._2)
     val dfcidadao = spark.read.schema(schemaCidadao).json(jsonRDD)
     //print no Driver
     dfcidadao.collect().map(row => println(row))
    
   }
}



Com o Json estruturado para um dataframe "dfcidadao", agora pode-se efetuar um filtro com dados, no caso abaixo será efetuar um filtro para o estado de SP, para esse processo pode-se ocorrer em três modos:

Informar dataframe("coluna")

val dffilter1 = dfcidadao.filter(dfcidadao("uf")==="SP")

Mas caso utilizar o pacote: org.apache.spark.sql.functions._: col("coluna")

val dffilter1 = dfcidadao.filter(col("uf")==="SP")

Mas se utilizar as classes implicitas: spark.implicits._ : $"coluna"

val dffilter1 = dfcidadao.filter($"uf"==="SP"

Código completo:


package com.slothbigdata.licao

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._


object DataFrameFilterJoin {
 
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession.builder()
                             .appName("Spark Pi")
                             .master("local")
                             .getOrCreate()
  
     import spark.implicits._
    
     //definindo schema
     val schemaCidadao    = StructType(
                              List(
                                StructField("nome",StringType,true),
                                StructField("idade",IntegerType,true),
                                StructField("cidade",StringType,true),
                                StructField("uf",StringType,true),
                                StructField("id_profissao",IntegerType,true)
                              )
                            )        
     //json
     val jsonRDD = spark.sparkContext
                        .wholeTextFiles("C:\\dados\\arquivo_cidadao\\cidadao.json")
                        .map(x => x._2)
     val dfcidadao = spark.read.schema(schemaCidadao).json(jsonRDD)
     //filtro para a uf SP
     val dffilter1 = dfcidadao.filter(col("uf")==="SP")
     dffilter1.collect().map(row => println(row))
  
   }
}

Processo de Join

Para o caso de join, jogue em um diretório o do git: profissoes.csv
Agora vamos ler um arquivo em CSV com o seguinte processo.

1 - Criar o StructType com a estrutura de dados do CSV.
2 - Importar o CSV e criar um Dataframe.

package com.slothbigdata.licao

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

object DataFrameFilterJoin {
 
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession.builder()
                             .appName("Spark Pi")
                             .master("local")
                             .getOrCreate()
  
     import spark.implicits._
    
     //definindo schema
     val schemaCidadao    = StructType(
                              List(
                                StructField("nome",StringType,true),
                                StructField("idade",IntegerType,true),
                                StructField("cidade",StringType,true),
                                StructField("uf",StringType,true),
                                StructField("id_profissao",IntegerType,true)
                              )
                            )        
     //json
     val jsonRDD = spark.sparkContext
                        .wholeTextFiles("C:\\dados\\arquivo_cidadao\\cidadao.json")
                        .map(x => x._2)
     val dfcidadao = spark.read.schema(schemaCidadao).json(jsonRDD)
     //filtro para a uf SP
     val dffilter1 = dfcidadao.filter(col("uf")==="SP")
    
     //csv
     val schemaProfissao = StructType(
                              List(
                                StructField("id_profissao",IntegerType,true),
                                StructField("descricao",StringType,true)
                              )
                            )   
     val dfprof = spark.read
                       .format("csv")
                       .option("header","true")
                       .schema(schemaProfissao)
                       .load("C:\\dados\\arquivo_cidadao\\profissoes.csv")
     dfprof.collect().map(row => println(row))

   }
}

Para efetuar um Join com o dataframe dffilter1 com o dataframe dfprof pode-se efeuar em duas maneiras:

Método: dataframe1.join(dataframe2, condicao, operacao)

Informando os dataframes:  dataframe(coluna)

dffilter1.join(dfprof,dffilter1("id_profissao") === dfprof("id_profissao"),"inner")

Utilizando a o pacote : org.apache.spark.sql.functions._: Seq("coluna") << nesse caso as colunas relacionadas no dataframe devem ter o mesmo nome.

dffilter1.join(dfprof,Seq("id_profissao"),"inner")


Código completo do Join:

package com.slothbigdata.licao

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

object DataFrameFilterJoin {
 
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession.builder()
                             .appName("Spark Pi")
                             .master("local")
                             .getOrCreate()
  
     import spark.implicits._
    
     //definindo schema
     val schemaCidadao    = StructType(
                              List(
                                StructField("nome",StringType,true),
                                StructField("idade",IntegerType,true),
                                StructField("cidade",StringType,true),
                                StructField("uf",StringType,true),
                                StructField("id_profissao",IntegerType,true)
                              )
                            )        
     //json
     val jsonRDD = spark.sparkContext
                        .wholeTextFiles("C:\\dados\\arquivo_cidadao\\cidadao.json")
                        .map(x => x._2)
     val dfcidadao = spark.read.schema(schemaCidadao).json(jsonRDD)
     //filtro para a uf SP
     val dffilter1 = dfcidadao.filter(col("uf")==="SP")
    
     //csv
     val schemaProfissao = StructType(
                              List(
                                StructField("id_profissao",IntegerType,true),
                                StructField("descricao",StringType,true)
                              )
                            )     
    
     val dfprof = spark.read
                       .format("csv")
                       .option("header","true")
                       .schema(schemaProfissao)
                       .load("C:\\dados\\arquivo_cidadao\\profissoes.csv")
    
     val dfCidadaoProfissao = dffilter1.join(dfprof,Seq("id_profissao"),"inner")

     //join das pessoas com as profissoes
     dfCidadaoProfissao.collect().map(row => println(row))    
   }
}



Para finalizar o processo, vamos efetuar uma contagem dos profissionais de SP, a instrução será agrupar as profissões e mostrar as quantidades de profissionais:

val qtdprofissionaissp = dfCidadaoProfissao.groupBy($"descricao").count;
     
Código completo com o Count.

package com.slothbigdata.licao

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

object DataFrameFilterJoin {
 
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession.builder()
                             .appName("Spark Pi")
                             .master("local")
                             .getOrCreate()
     import spark.implicits._
    
     //definindo schema
     val schemaCidadao    = StructType(
                              List(
                                StructField("nome",StringType,true),
                                StructField("idade",IntegerType,true),
                                StructField("cidade",StringType,true),
                                StructField("uf",StringType,true),
                                StructField("id_profissao",IntegerType,true)
                              )
                            )        
     //json
     val jsonRDD = spark.sparkContext
                        .wholeTextFiles("C:\\dados\\arquivo_cidadao\\cidadao.json")
                        .map(x => x._2)
     val dfcidadao = spark.read.schema(schemaCidadao).json(jsonRDD)
     //filtro para a uf SP
     val dffilter1 = dfcidadao.filter(col("uf")==="SP")
     //csv
     val schemaProfissao = StructType(
                              List(
                                StructField("id_profissao",IntegerType,true),
                                StructField("descricao",StringType,true)
                              )
                            )     
     val dfprof = spark.read
                       .format("csv")
                       .option("header","true")
                       .schema(schemaProfissao)
                       .load("C:\\dados\\arquivo_cidadao\\profissoes.csv")
  
     //join das pessoas com as profissoes
     val dfCidadaoProfissao = dffilter1.join(dfprof,Seq("id_profissao"),"inner")
    
     //profissionais no estado de sao paulo
     val qtdprofissionaissp = dfCidadaoProfissao.groupBy($"descricao").count;
     qtdprofissionaissp.collect().map(row => println(row))
   }

}



Em caso de dúvidas ou sugestões, escreva nos comentários ou nos mande um email: slothbigdata@gmail.com.

Veja também: https://slothbigdata.blogspot.com/2019/07/apache-spark-com-scala-manipulacao-do.html