O DataFrame é um coleção distribuida de dados, no qual oferece os beneficios do RDD (Resilient Distributed Dataset) com uma tipificação forte, untilizando as funções Lambdas das liguagem, para a manipulação de registros executando funçoes de join, filter, sum, flatmap, etc.
O DataFrame esta incluso nas bibliotecas Spark SQL, oferecidas pelo Apache Spark e torna a visão dos dados de acordo a uma tabela relacional.
Para executar o exemplo abaixo, é necessário seguir o procedimento do artigo:
https://slothbigdata.blogspot.com/2019/07/ambiente-para-rodar-o-spark-com-scala.html
Para o exercício, será criado uma instância do SparkSession, que esta disponível nas versões do Spark Apache 2.0
package com.slothbigdata.licao
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
object DataFrameFilterJoin {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Spark
Pi")
.master("local")
.getOrCreate()
}
}
Instanciado o SparkSession definindo o nome e a engine de processamento, insira o arquivo cidadao.json em uma pasta, este arquivo foi baixado no repositório do git conforme o artigo mencionado acima.
Capture o arquivo json com o seguinte processo.
1 - Cria um StructType para definição do schema da tabela. O StructType é uma classe do Spark no qual efetua o mapeamento do Dataframe.
2 - Carga de dados do Json:
package com.slothbigdata.licao
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
object DataFrameFilterJoin {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Spark
Pi")
.master("local")
.getOrCreate()
import spark.implicits._
//definindo
schema
val schemaCidadao = StructType(
List(
StructField("nome",StringType,true),
StructField("idade",IntegerType,true),
StructField("cidade",StringType,true),
StructField("uf",StringType,true),
StructField("id_profissao",IntegerType,true)
)
)
//json
val jsonRDD = spark.sparkContext .wholeTextFiles("C:\\dados\\arquivo_cidadao\\cidadao.json")
.map(x => x._2)
val dfcidadao = spark.read.schema(schemaCidadao).json(jsonRDD)
//print no
Driver
dfcidadao.collect().map(row => println(row))
}
}
Com o Json estruturado para um dataframe "
dfcidadao", agora pode-se efetuar um filtro com dados, no caso abaixo será efetuar um filtro para o estado de SP, para esse processo pode-se ocorrer em três modos:
Informar dataframe("coluna")
val dffilter1 = dfcidadao.filter(dfcidadao("uf")==="SP")
Mas caso utilizar o pacote: org.apache.spark.sql.functions._: col("coluna")
val dffilter1 = dfcidadao.filter(col("uf")==="SP")
Mas se utilizar as classes implicitas: spark.implicits._ : $"coluna"
val dffilter1 = dfcidadao.filter($"uf"==="SP")
Código completo:
package com.slothbigdata.licao
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
object DataFrameFilterJoin {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Spark
Pi")
.master("local")
.getOrCreate()
import spark.implicits._
//definindo
schema
val schemaCidadao = StructType(
List(
StructField("nome",StringType,true),
StructField("idade",IntegerType,true),
StructField("cidade",StringType,true),
StructField("uf",StringType,true),
StructField("id_profissao",IntegerType,true)
)
)
//json
val jsonRDD = spark.sparkContext
.wholeTextFiles("C:\\dados\\arquivo_cidadao\\cidadao.json")
.map(x => x._2)
val dfcidadao = spark.read.schema(schemaCidadao).json(jsonRDD)
//filtro
para a uf SP
val dffilter1 = dfcidadao.filter(col("uf")==="SP")
dffilter1.collect().map(row => println(row))
}
}
Processo de Join
Para o caso de join, jogue em um diretório o do git: profissoes.csv
Agora vamos ler um arquivo em CSV com o seguinte processo.
1 - Criar o StructType com a estrutura de dados do CSV.
2 - Importar o CSV e criar um Dataframe.
package com.slothbigdata.licao
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
object DataFrameFilterJoin {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Spark
Pi")
.master("local")
.getOrCreate()
import spark.implicits._
//definindo
schema
val schemaCidadao = StructType(
List(
StructField("nome",StringType,true),
StructField("idade",IntegerType,true),
StructField("cidade",StringType,true),
StructField("uf",StringType,true),
StructField("id_profissao",IntegerType,true)
)
)
//json
val jsonRDD = spark.sparkContext
.wholeTextFiles("C:\\dados\\arquivo_cidadao\\cidadao.json")
.map(x => x._2)
val dfcidadao = spark.read.schema(schemaCidadao).json(jsonRDD)
//filtro para
a uf SP
val dffilter1 = dfcidadao.filter(col("uf")==="SP")
//csv
val schemaProfissao = StructType(
List(
StructField("id_profissao",IntegerType,true),
StructField("descricao",StringType,true)
)
)
val dfprof = spark.read
.format("csv")
.option("header","true")
.schema(schemaProfissao)
.load("C:\\dados\\arquivo_cidadao\\profissoes.csv")
dfprof.collect().map(row => println(row))
}
}
Para efetuar um Join com o dataframe dffilter1 com o dataframe dfprof pode-se efeuar em duas maneiras:
Método: dataframe1.join(dataframe2, condicao, operacao)
Informando os dataframes: dataframe(coluna)
dffilter1.join(dfprof,dffilter1("id_profissao") === dfprof("id_profissao"),"inner")
Utilizando a o pacote : org.apache.spark.sql.functions._:
Seq("coluna") << nesse caso as colunas relacionadas no dataframe devem ter o mesmo nome.
dffilter1.join(dfprof,Seq("id_profissao"),"inner")
Código completo do Join:
package com.slothbigdata.licao
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
object DataFrameFilterJoin {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Spark
Pi")
.master("local")
.getOrCreate()
import spark.implicits._
//definindo
schema
val schemaCidadao = StructType(
List(
StructField("nome",StringType,true),
StructField("idade",IntegerType,true),
StructField("cidade",StringType,true),
StructField("uf",StringType,true),
StructField("id_profissao",IntegerType,true)
)
)
//json
val jsonRDD = spark.sparkContext
.wholeTextFiles("C:\\dados\\arquivo_cidadao\\cidadao.json")
.map(x => x._2)
val dfcidadao = spark.read.schema(schemaCidadao).json(jsonRDD)
//filtro para
a uf SP
val dffilter1 = dfcidadao.filter(col("uf")==="SP")
//csv
val schemaProfissao = StructType(
List(
StructField("id_profissao",IntegerType,true),
StructField("descricao",StringType,true)
)
)
val dfprof = spark.read
.format("csv")
.option("header","true")
.schema(schemaProfissao)
.load("C:\\dados\\arquivo_cidadao\\profissoes.csv")
val dfCidadaoProfissao = dffilter1.join(dfprof,Seq("id_profissao"),"inner")
//join das
pessoas com as profissoes
dfCidadaoProfissao.collect().map(row => println(row))
}
}
Para finalizar o processo, vamos efetuar uma contagem dos profissionais de SP, a instrução será agrupar as profissões e mostrar as quantidades de profissionais:
val qtdprofissionaissp = dfCidadaoProfissao.groupBy($"descricao").count;
Código completo com o Count.
package com.slothbigdata.licao
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
object DataFrameFilterJoin {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Spark
Pi")
.master("local")
.getOrCreate()
import spark.implicits._
//definindo
schema
val schemaCidadao = StructType(
List(
StructField("nome",StringType,true),
StructField("idade",IntegerType,true),
StructField("cidade",StringType,true),
StructField("uf",StringType,true),
StructField("id_profissao",IntegerType,true)
)
)
//json
val jsonRDD = spark.sparkContext
.wholeTextFiles("C:\\dados\\arquivo_cidadao\\cidadao.json")
.map(x => x._2)
val dfcidadao = spark.read.schema(schemaCidadao).json(jsonRDD)
//filtro
para a uf SP
val dffilter1 = dfcidadao.filter(col("uf")==="SP")
//csv
val schemaProfissao = StructType(
List(
StructField("id_profissao",IntegerType,true),
StructField("descricao",StringType,true)
)
)
val dfprof = spark.read
.format("csv")
.option("header","true")
.schema(schemaProfissao)
.load("C:\\dados\\arquivo_cidadao\\profissoes.csv")
//join das
pessoas com as profissoes
val dfCidadaoProfissao = dffilter1.join(dfprof,Seq("id_profissao"),"inner")
//profissionais
no estado de sao paulo
val qtdprofissionaissp = dfCidadaoProfissao.groupBy($"descricao").count;
qtdprofissionaissp.collect().map(row => println(row))
}
}
Em caso de dúvidas ou sugestões, escreva nos comentários ou nos mande um email: slothbigdata@gmail.com.
Veja também:
https://slothbigdata.blogspot.com/2019/07/apache-spark-com-scala-manipulacao-do.html