Mostrando postagens com marcador Spark. Mostrar todas as postagens
Mostrando postagens com marcador Spark. Mostrar todas as postagens

quinta-feira, 29 de agosto de 2019

Apache Spark no Python: PySpark com o Windows usando o Pycharm


Dando continuidade ao artigo: Ambiente para rodar o Spark com Scala na IDE do Eclipse (link: https://slothbigdata.blogspot.com/2019/07/ambiente-para-rodar-o-spark-com-scala.html), no qual preparamos o Eclipse para executar os processos do Spark no Windows utilizando a linguagem Scala.

Efetuaremos o mesmo processo nesse post, mas utilizando o Spark, programando em Python dentro da IDE do Pycharm. Assim será possível executar os seus próprios modelos, sem a necessidade de utilizar um cluster ou uma VM com a distribuição do Big Data.



1 – Instalação do Java

Primeiro passo é efetuar a instalação da JVM do Java para rodar os pacotes do Spark. No caso abaixo vamos aproveitar e instalar o pacote de desenvolvimento do Java (JDK).
Efetuar a instalação do JDK e definir as variáveis de ambiente no Windows.



Criar a variável do JAVA_HOME no diretório no qual a aplicação foi instalada.


 Editando a variável Path, insira o caminho do executável do Java.


%JAVA_HOME%\bin


Para teste executar o teste, efetue o comando no Prompt do Windows:


javac -version









2 – Instalação do Python

Para a instalação do Python vamos utilizar uma versão do Anconda, que já vem carregada de bibliotecas matemáticas.



Selecione a opção “Windows”, baixe o executável,  efetue a instalação com os passos requeridos.

Após a instalação acesse as variáveis de ambiente e insira o caminho do Python na variável Path do mesmo modo que foi configurado o Java acima.


Comum o ambiente do Anaconda está configurado nesse caminho:
C:\Users\meuusuario\AppData\Local\Continuum\anaconda3


Para o teste acesso o Prompt e execute o comando:


python –version









3 – Preparando o ambiente do Hadoop no Windows:

Revendo o artigo: Ambiente para rodar o Spark com Scala na IDE do Eclipse (link: https://slothbigdata.blogspot.com/2019/07/ambiente-para-rodar-o-spark-com-scala.html), devem executar o processo de implantação do WinUtils no Windows. O arquivo pode ser baixado no link: https://github.com/slothbigdata/SlothBigDataSparkLicao.


Ao baixar o arquivo e descompacta-lo, novamente devemos definir a variável de ambiente HADOOP_HOME, para o caminho do diretório.


Insira a nova variável no Path:

%HADOOP%\bin





4 – Instalação das bibliotecas do Spark.

Efetuar o download do arquivo:

Descompactar o arquivo no diretório C:



Visão do diretório:

Entrar no command line do Windows.
Acessar o diretório: C:\spark-2.4.3-bin-hadoop2.7\bin
Digitar: pyspark



Após a instalação conseguiremos utilizar o Spark pelo Prompt de Comando do Windows.





5 – Utilizando o desenvolvimento no Pycharm.

Agora configuraremos o Spark para rodar no PyCharm, para isso temos que configurar mais variáveis de ambiente. Agora vamos inserir o SPARK_HOME, com o caminho do pacote do Spark que baixamos acima.

C:\spark-2.4.3-bin-hadoop2.7


Incluir no Path: %SPARK_HOME%\bin



Para teste, execute o pyspark no prompt em qualquer local.



Abaixe a IDE do PyCharm no site: https://www.jetbrains.com/pycharm/

Após a instalação do Pycharm crie um novo projeto. Clicando na versão do Python, vamos selecionar Interpret Settings -> Project Structure





Clique em Add Content Root:

Adicione o diretório do Python:
C:\spark-2.4.3-bin-hadoop2.7\python

Adicione o py4j-0.9-src.zip no diretório:
C:\spark-2.4.3-bin-hadoop2.7\python\lib\ py4j-0.10.7-src.zip


Após essas configurações, reinicie o PyCharm e o Spark esta pronto para rodar na IDE.


Em caso de dúvidas ou sugestões, escreva nos comentários ou nos mande um email: slothbigdata@gmail.com.


quinta-feira, 25 de julho de 2019

Apache Spark com Scala: Manipulação de DataFrame Etapa1


O DataFrame é um coleção distribuida de dados, no qual oferece os beneficios  do RDD  (Resilient Distributed Dataset)  com uma tipificação forte, untilizando as funções Lambdas das liguagem, para a manipulação de registros executando funçoes de join, filter, sum, flatmap, etc.
O DataFrame esta incluso nas bibliotecas Spark SQL, oferecidas pelo Apache Spark e torna a visão dos dados de acordo a uma tabela relacional.

Para executar o exemplo abaixo, é necessário seguir o procedimento do artigo:

https://slothbigdata.blogspot.com/2019/07/ambiente-para-rodar-o-spark-com-scala.html

Para o exercício, será criado uma instância do SparkSession, que esta disponível nas versões do Spark Apache 2.0

package com.slothbigdata.licao

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._


object DataFrameFilterJoin {
 
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession.builder()
                             .appName("Spark Pi")
                             .master("local")
                             .getOrCreate()
  
   }

}

Instanciado o SparkSession definindo o nome e a engine de processamento, insira o arquivo cidadao.json em uma pasta, este arquivo foi baixado no repositório do git conforme o artigo mencionado acima.
Capture o arquivo json com o seguinte processo.

1 - Cria um StructType para definição do schema da tabela. O StructType é uma classe do Spark no qual efetua o mapeamento do Dataframe.

2 - Carga de dados do Json:


package com.slothbigdata.licao

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._


object DataFrameFilterJoin {
 
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession.builder()
                             .appName("Spark Pi")
                             .master("local")
                             .getOrCreate()
     import spark.implicits._
    
     //definindo schema
     val schemaCidadao    = StructType(
                              List(
                                StructField("nome",StringType,true),
                                StructField("idade",IntegerType,true),
                                StructField("cidade",StringType,true),
                                StructField("uf",StringType,true),
                                StructField("id_profissao",IntegerType,true)
                              )
                            )        
     //json
     val jsonRDD = spark.sparkContext                                                                             .wholeTextFiles("C:\\dados\\arquivo_cidadao\\cidadao.json")
                        .map(x => x._2)
     val dfcidadao = spark.read.schema(schemaCidadao).json(jsonRDD)
     //print no Driver
     dfcidadao.collect().map(row => println(row))
    
   }
}



Com o Json estruturado para um dataframe "dfcidadao", agora pode-se efetuar um filtro com dados, no caso abaixo será efetuar um filtro para o estado de SP, para esse processo pode-se ocorrer em três modos:

Informar dataframe("coluna")

val dffilter1 = dfcidadao.filter(dfcidadao("uf")==="SP")

Mas caso utilizar o pacote: org.apache.spark.sql.functions._: col("coluna")

val dffilter1 = dfcidadao.filter(col("uf")==="SP")

Mas se utilizar as classes implicitas: spark.implicits._ : $"coluna"

val dffilter1 = dfcidadao.filter($"uf"==="SP"

Código completo:


package com.slothbigdata.licao

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._


object DataFrameFilterJoin {
 
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession.builder()
                             .appName("Spark Pi")
                             .master("local")
                             .getOrCreate()
  
     import spark.implicits._
    
     //definindo schema
     val schemaCidadao    = StructType(
                              List(
                                StructField("nome",StringType,true),
                                StructField("idade",IntegerType,true),
                                StructField("cidade",StringType,true),
                                StructField("uf",StringType,true),
                                StructField("id_profissao",IntegerType,true)
                              )
                            )        
     //json
     val jsonRDD = spark.sparkContext
                        .wholeTextFiles("C:\\dados\\arquivo_cidadao\\cidadao.json")
                        .map(x => x._2)
     val dfcidadao = spark.read.schema(schemaCidadao).json(jsonRDD)
     //filtro para a uf SP
     val dffilter1 = dfcidadao.filter(col("uf")==="SP")
     dffilter1.collect().map(row => println(row))
  
   }
}

Processo de Join

Para o caso de join, jogue em um diretório o do git: profissoes.csv
Agora vamos ler um arquivo em CSV com o seguinte processo.

1 - Criar o StructType com a estrutura de dados do CSV.
2 - Importar o CSV e criar um Dataframe.

package com.slothbigdata.licao

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

object DataFrameFilterJoin {
 
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession.builder()
                             .appName("Spark Pi")
                             .master("local")
                             .getOrCreate()
  
     import spark.implicits._
    
     //definindo schema
     val schemaCidadao    = StructType(
                              List(
                                StructField("nome",StringType,true),
                                StructField("idade",IntegerType,true),
                                StructField("cidade",StringType,true),
                                StructField("uf",StringType,true),
                                StructField("id_profissao",IntegerType,true)
                              )
                            )        
     //json
     val jsonRDD = spark.sparkContext
                        .wholeTextFiles("C:\\dados\\arquivo_cidadao\\cidadao.json")
                        .map(x => x._2)
     val dfcidadao = spark.read.schema(schemaCidadao).json(jsonRDD)
     //filtro para a uf SP
     val dffilter1 = dfcidadao.filter(col("uf")==="SP")
    
     //csv
     val schemaProfissao = StructType(
                              List(
                                StructField("id_profissao",IntegerType,true),
                                StructField("descricao",StringType,true)
                              )
                            )   
     val dfprof = spark.read
                       .format("csv")
                       .option("header","true")
                       .schema(schemaProfissao)
                       .load("C:\\dados\\arquivo_cidadao\\profissoes.csv")
     dfprof.collect().map(row => println(row))

   }
}

Para efetuar um Join com o dataframe dffilter1 com o dataframe dfprof pode-se efeuar em duas maneiras:

Método: dataframe1.join(dataframe2, condicao, operacao)

Informando os dataframes:  dataframe(coluna)

dffilter1.join(dfprof,dffilter1("id_profissao") === dfprof("id_profissao"),"inner")

Utilizando a o pacote : org.apache.spark.sql.functions._: Seq("coluna") << nesse caso as colunas relacionadas no dataframe devem ter o mesmo nome.

dffilter1.join(dfprof,Seq("id_profissao"),"inner")


Código completo do Join:

package com.slothbigdata.licao

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

object DataFrameFilterJoin {
 
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession.builder()
                             .appName("Spark Pi")
                             .master("local")
                             .getOrCreate()
  
     import spark.implicits._
    
     //definindo schema
     val schemaCidadao    = StructType(
                              List(
                                StructField("nome",StringType,true),
                                StructField("idade",IntegerType,true),
                                StructField("cidade",StringType,true),
                                StructField("uf",StringType,true),
                                StructField("id_profissao",IntegerType,true)
                              )
                            )        
     //json
     val jsonRDD = spark.sparkContext
                        .wholeTextFiles("C:\\dados\\arquivo_cidadao\\cidadao.json")
                        .map(x => x._2)
     val dfcidadao = spark.read.schema(schemaCidadao).json(jsonRDD)
     //filtro para a uf SP
     val dffilter1 = dfcidadao.filter(col("uf")==="SP")
    
     //csv
     val schemaProfissao = StructType(
                              List(
                                StructField("id_profissao",IntegerType,true),
                                StructField("descricao",StringType,true)
                              )
                            )     
    
     val dfprof = spark.read
                       .format("csv")
                       .option("header","true")
                       .schema(schemaProfissao)
                       .load("C:\\dados\\arquivo_cidadao\\profissoes.csv")
    
     val dfCidadaoProfissao = dffilter1.join(dfprof,Seq("id_profissao"),"inner")

     //join das pessoas com as profissoes
     dfCidadaoProfissao.collect().map(row => println(row))    
   }
}



Para finalizar o processo, vamos efetuar uma contagem dos profissionais de SP, a instrução será agrupar as profissões e mostrar as quantidades de profissionais:

val qtdprofissionaissp = dfCidadaoProfissao.groupBy($"descricao").count;
     
Código completo com o Count.

package com.slothbigdata.licao

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

object DataFrameFilterJoin {
 
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession.builder()
                             .appName("Spark Pi")
                             .master("local")
                             .getOrCreate()
     import spark.implicits._
    
     //definindo schema
     val schemaCidadao    = StructType(
                              List(
                                StructField("nome",StringType,true),
                                StructField("idade",IntegerType,true),
                                StructField("cidade",StringType,true),
                                StructField("uf",StringType,true),
                                StructField("id_profissao",IntegerType,true)
                              )
                            )        
     //json
     val jsonRDD = spark.sparkContext
                        .wholeTextFiles("C:\\dados\\arquivo_cidadao\\cidadao.json")
                        .map(x => x._2)
     val dfcidadao = spark.read.schema(schemaCidadao).json(jsonRDD)
     //filtro para a uf SP
     val dffilter1 = dfcidadao.filter(col("uf")==="SP")
     //csv
     val schemaProfissao = StructType(
                              List(
                                StructField("id_profissao",IntegerType,true),
                                StructField("descricao",StringType,true)
                              )
                            )     
     val dfprof = spark.read
                       .format("csv")
                       .option("header","true")
                       .schema(schemaProfissao)
                       .load("C:\\dados\\arquivo_cidadao\\profissoes.csv")
  
     //join das pessoas com as profissoes
     val dfCidadaoProfissao = dffilter1.join(dfprof,Seq("id_profissao"),"inner")
    
     //profissionais no estado de sao paulo
     val qtdprofissionaissp = dfCidadaoProfissao.groupBy($"descricao").count;
     qtdprofissionaissp.collect().map(row => println(row))
   }

}



Em caso de dúvidas ou sugestões, escreva nos comentários ou nos mande um email: slothbigdata@gmail.com.

Veja também: https://slothbigdata.blogspot.com/2019/07/apache-spark-com-scala-manipulacao-do.html