1、读Hive表数据
pyspark读取hive数据非常简单,因为它有专门的接口来读取,完全不需要像hbase那样,需要做很多配置,pyspark提供的操作hive的接口,使得程序可以直接使用SQL语句从hive里面查询需要的数据,代码如下:
from pyspark.sql import HiveContext,SparkSession _SPARK_HOST = "spark://spark-master:7077" _APP_NAME = "test" spark_session = SparkSession.builder.master(_SPARK_HOST).appName(_APP_NAME).getOrCreate() hive_context= HiveContext(spark_session ) # 生成查询的SQL语句,这个跟hive的查询语句一样,所以也可以加where等条件语句 hive_database = "database1" hive_table = "test" hive_read = "select * from {}.{}".format(hive_database, hive_table) # 通过SQL语句在hive中查询的数据直接是dataframe的形式 read_df = hive_context.sql(hive_read)
2 、将数据写入hive表
pyspark写hive表有两种方式:
(1)通过SQL语句生成表
from pyspark.sql import SparkSession, HiveContext _SPARK_HOST = "spark://spark-master:7077" _APP_NAME = "test" spark = SparkSession.builder.master(_SPARK_HOST).appName(_APP_NAME).getOrCreate() data = [ (1,"3","145"), (1,"4","146"), (1,"5","25"), (1,"6","26"), (2,"32","32"), (2,"8","134"), (2,"8","134"), (2,"9","137") ] df = spark.createDataFrame(data, ['id', "test_id", 'camera_id']) # method one,default是默认数据库的名字,write_test 是要写到default中数据表的名字 df.registerTempTable('test_hive') sqlContext.sql("create table default.write_test select * from test_hive")
(2)saveastable的方式
# method two # "overwrite"是重写表的模式,如果表存在,就覆盖掉原始数据,如果不存在就重新生成一张表 # mode("append")是在原有表的基础上进行添加数据 df.write.format("hive").mode("overwrite").saveAsTable('default.write_test')
tips:
spark用上面几种方式读写hive时,需要在提交任务时加上相应的配置,不然会报错:
spark-submit --conf spark.sql.catalogImplementation=hive test.py
补充知识:PySpark基于SHC框架读取HBase数据并转成DataFrame
一、首先需要将HBase目录lib下的jar包以及SHC的jar包复制到所有节点的Spark目录lib下
二、修改spark-defaults.conf 在spark.driver.extraClassPath和spark.executor.extraClassPath把上述jar包所在路径加进去
三、重启集群
四、代码
#/usr/bin/python #-*- coding:utf-8 –*- from pyspark import SparkContext from pyspark.sql import SQLContext,HiveContext,SparkSession from pyspark.sql.types import Row,StringType,StructField,StringType,IntegerType from pyspark.sql.dataframe import DataFrame sc = SparkContext(appName="pyspark_hbase") sql_sc = SQLContext(sc) dep = "org.apache.spark.sql.execution.datasources.hbase" #定义schema catalog = """{ "table":{"namespace":"default", "name":"teacher"}, "rowkey":"key", "columns":{ "id":{"cf":"rowkey", "col":"key", "type":"string"}, "name":{"cf":"teacherInfo", "col":"name", "type":"string"}, "age":{"cf":"teacherInfo", "col":"age", "type":"string"}, "gender":{"cf":"teacherInfo", "col":"gender","type":"string"}, "cat":{"cf":"teacherInfo", "col":"cat","type":"string"}, "tag":{"cf":"teacherInfo", "col":"tag", "type":"string"}, "level":{"cf":"teacherInfo", "col":"level","type":"string"} } }""" df = sql_sc.read.options(catalog = catalog).format(dep).load() print ('***************************************************************') print ('***************************************************************') print ('***************************************************************') df.show() print ('***************************************************************') print ('***************************************************************') print ('***************************************************************') sc.stop()
五、解释
数据来源参考请本人之前的文章,在此不做赘述
schema定义参考如图:
六、结果
以上这篇在python中使用pyspark读写Hive数据操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
RTX 5090要首发 性能要翻倍!三星展示GDDR7显存
三星在GTC上展示了专为下一代游戏GPU设计的GDDR7内存。
首次推出的GDDR7内存模块密度为16GB,每个模块容量为2GB。其速度预设为32 Gbps(PAM3),但也可以降至28 Gbps,以提高产量和初始阶段的整体性能和成本效益。
据三星表示,GDDR7内存的能效将提高20%,同时工作电压仅为1.1V,低于标准的1.2V。通过采用更新的封装材料和优化的电路设计,使得在高速运行时的发热量降低,GDDR7的热阻比GDDR6降低了70%。
更新日志
- 魔兽世界奥卡兹岛地牢入口在哪里 奥卡兹岛地牢入口位置一览
- 和文军-丽江礼物[2007]FLAC
- 陈随意2012-今生的伴[豪记][WAV+CUE]
- 罗百吉.2018-我们都一样【乾坤唱片】【WAV+CUE】
- 《怪物猎人:荒野》不加中配请愿书引热议:跪久站不起来了?
- 《龙腾世纪4》IGN 9分!殿堂级RPG作品
- Twitch新规禁止皮套外露敏感部位 主播直接“真身”出镜
- 木吉他.1994-木吉他作品全集【滚石】【WAV+CUE】
- 莫华伦.2022-一起走过的日子【京文】【WAV+CUE】
- 曾淑勤.1989-装在袋子里的回忆【点将】【WAV+CUE】
- 滚石香港黄金十年系列《赵传精选》首版[WAV+CUE][1.1G]
- 雷婷《乡村情歌·清新民谣》1:1母盘直刻[低速原抓WAV+CUE][1.1G]
- 群星 《DJ夜色魅影HQⅡ》天艺唱片[WAV+CUE][1.1G]
- 群星《烧透你的耳朵2》DXD金佰利 [低速原抓WAV+CUE][1.3G]
- 群星《难忘的回忆精选4》宝丽金2CD[WAV+CUE][1.4G]