圆月山庄资源网 Design By www.vgjia.com
执行 datax 作业,创建执行文件,在 crontab 中每天1点(下面有关系)执行:
其中 job_start 及 job_finish 这两行记录是自己添加的,为了方便识别出哪张表。
#!/bin/bash source /etc/profile user1="root" pass1="pwd" user2="root" pass2="pwd" job_path="/opt/datax/job/" jobfile=( job_table_a.json job_table_b.json ) for filename in ${jobfile[@]} do echo "job_start: "`date "+%Y-%m-%d %H:%M:%S"`" ${filename}" python /opt/datax/bin/datax.py -p "-Duser1=${user1} -Dpass1=${pass1} -Duser2=${user2} -Dpass2=${pass2}" ${job_path}${filename} echo "job_finish: "`date "+%Y-%m-%d %H:%M:%S"`" ${filename}" done # 0 1 * * * /opt/datax/job/dc_to_ods_incr.sh /opt/datax/job/log/dc_to_ods_incr_$(date +\%Y\%m\%d_\%H\%M\%S).log 2>&1 # egrep '任务|速度|总数|job_start|job_finish' /opt/datax/job/log/
datax 执行日志:
job_start: 2018-08-08 01:13:28 job_table_a.json 任务启动时刻 : 2018-08-08 01:13:28 任务结束时刻 : 2018-08-08 01:14:49 任务总计耗时 : 81s 任务平均流量 : 192.82KB/s 记录写入速度 : 1998rec/s 读出记录总数 : 159916 读写失败总数 : 0 job_finish: 2018-08-08 01:14:49 job_table_a.json job_start: 2018-08-08 01:14:49 job_table_b.json 任务启动时刻 : 2018-08-08 01:14:50 任务结束时刻 : 2018-08-08 01:15:01 任务总计耗时 : 11s 任务平均流量 : 0B/s 记录写入速度 : 0rec/s 读出记录总数 : 0 读写失败总数 : 0 job_finish: 2018-08-08 01:15:01 job_table_b.json
接下来读取这些信息保存到数据库,在数据库中创建表:
CREATE TABLE `datax_job_result` ( `log_file` varchar(200) DEFAULT NULL, `job_file` varchar(200) DEFAULT NULL, `start_time` datetime DEFAULT NULL, `end_time` datetime DEFAULT NULL, `seconds` int(11) DEFAULT NULL, `traffic` varchar(50) DEFAULT NULL, `write_speed` varchar(50) DEFAULT NULL, `read_record` int(11) DEFAULT NULL, `failed_record` int(11) DEFAULT NULL, `job_start` varchar(200) DEFAULT NULL, `job_finish` varchar(200) DEFAULT NULL, `insert_time` datetime DEFAULT CURRENT_TIMESTAMP ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
定时执行以下文件,因为 datax 作业 1 点执行,为了获取一天内最新生产的日志,脚本中取 82800内生产的日志文件,及23 小时内生产的那个最新日志。所以一天内任何时间执行都可以。此文件也是定时每天执行(判断 datax 作业完成后执行)
#!/usr/bin/python # -*- coding: UTF-8 -*- # 0 5 * * * source /etc/profile && /usr/bin/python2.7 /opt/datax/job/save_log_to_db.py > /dev/null 2>&1 import re import os import sqlalchemy import pandas as pd import datetime as dt def save_to_db(df): engine = sqlalchemy.create_engine("mysql+pymysql://root:pwd@localhost:3306/test", encoding="utf-8") df.to_sql("datax_job_result", engine, index=False, if_exists='append') def get_the_latest_file(path): t0 = dt.datetime.utcfromtimestamp(0) d2 = (dt.datetime.now() - t0).total_seconds() d1 = d2 - 82800 for (dirpath, dirnames, filenames) in os.walk(path): for filename in sorted(filenames, reverse = True): if filename.endswith(".log"): f = os.path.join(dirpath,filename) ctime = os.stat(f)[-1] if ctime>=d1 and ctime <=d2: return f def get_job_result_from_logfile(path): result = pd.DataFrame(columns=['log_file','job_file','start_time','end_time','seconds','traffic','write_speed','read_record','failed_record','job_start','job_finish']) log_file = get_the_latest_file(path) index = 0 content = open(log_file, "r") for line in content: result.loc[index, 'log_file'] = log_file if re.compile(r'job_start').match(line): result.loc[index, 'job_file'] = line.split(' ')[4].strip() result.loc[index, 'job_start'] = line, elif re.compile(r'任务启动时刻').match(line): result.loc[index, 'start_time'] = line.split('刻')[1].strip().split(' ')[1].strip() + ' ' + line.split('刻')[1].strip().split(' ')[2].strip() elif re.compile(r'任务结束时刻').match(line): result.loc[index, 'end_time'] = line.split('刻')[1].strip().split(' ')[1].strip() + ' ' + line.split('刻')[1].strip().split(' ')[2].strip() elif re.compile(r'任务总计耗时').match(line): result.loc[index, 'seconds'] = line.split(':')[1].strip().replace('s','') elif re.compile(r'任务平均流量').match(line): result.loc[index, 'traffic'] = line.split(':')[1].strip() elif re.compile(r'记录写入速度').match(line): result.loc[index, 'write_speed'] = line.split(':')[1].strip() elif re.compile(r'读出记录总数').match(line): result.loc[index, 'read_record'] = line.split(':')[1].strip() elif re.compile(r'读写失败总数').match(line): result.loc[index, 'failed_record'] = line.split(':')[1].strip() elif re.compile(r'job_finish').match(line): result.loc[index, 'job_finish'] = line, index = index + 1 else: pass save_to_db(result) get_job_result_from_logfile("/opt/datax/job/log")
以上这篇Python 获取 datax 执行结果保存到数据库的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
圆月山庄资源网 Design By www.vgjia.com
广告合作:本站广告合作请联系QQ:858582 申请时备注:广告合作(否则不回)
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
圆月山庄资源网 Design By www.vgjia.com
暂无评论...
稳了!魔兽国服回归的3条重磅消息!官宣时间再确认!
昨天有一位朋友在大神群里分享,自己亚服账号被封号之后居然弹出了国服的封号信息对话框。
这里面让他访问的是一个国服的战网网址,com.cn和后面的zh都非常明白地表明这就是国服战网。
而他在复制这个网址并且进行登录之后,确实是网易的网址,也就是我们熟悉的停服之后国服发布的暴雪游戏产品运营到期开放退款的说明。这是一件比较奇怪的事情,因为以前都没有出现这样的情况,现在突然提示跳转到国服战网的网址,是不是说明了简体中文客户端已经开始进行更新了呢?
更新日志
2024年11月05日
2024年11月05日
- 雨林唱片《赏》新曲+精选集SACD版[ISO][2.3G]
- 罗大佑与OK男女合唱团.1995-再会吧!素兰【音乐工厂】【WAV+CUE】
- 草蜢.1993-宝贝对不起(国)【宝丽金】【WAV+CUE】
- 杨培安.2009-抒·情(EP)【擎天娱乐】【WAV+CUE】
- 周慧敏《EndlessDream》[WAV+CUE]
- 彭芳《纯色角3》2007[WAV+CUE]
- 江志丰2008-今生为你[豪记][WAV+CUE]
- 罗大佑1994《恋曲2000》音乐工厂[WAV+CUE][1G]
- 群星《一首歌一个故事》赵英俊某些作品重唱企划[FLAC分轨][1G]
- 群星《网易云英文歌曲播放量TOP100》[MP3][1G]
- 方大同.2024-梦想家TheDreamer【赋音乐】【FLAC分轨】
- 李慧珍.2007-爱死了【华谊兄弟】【WAV+CUE】
- 王大文.2019-国际太空站【环球】【FLAC分轨】
- 群星《2022超好听的十倍音质网络歌曲(163)》U盘音乐[WAV分轨][1.1G]
- 童丽《啼笑姻缘》头版限量编号24K金碟[低速原抓WAV+CUE][1.1G]