• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

DATAXMongoDB增量数据写入到mysql

武飞扬头像
michelle_ycx
帮助1

项目场景:

简述:

使用DATAX进行Mongo的数据抽取,然后写入到mysql中,其中会牵涉到全量数据的写入和增量数据的写入。
全量 数据的写入我们只需要正常写JSON模板即可,使用column将要获取的字段。但是考虑到有增量数据获取的时候,需要增加限定条件。

{

  "job": {

    "content": [

      {

        "reader": {

          "name": "mongodbreader",

          "parameter": {

            "address":["IP:端口号"],
			"toUri":true,"ssl":false,
			"trustStore":"",
			"trustStorePassword":"",
			"userName":"xxx",
			"userPassword":"",
			"dbName":"xxxx",
			"collectionName": "xxxx",
      "column":[
   {
    "name":"CRT_TM",
    "type":"string"
   }
      ]
          }
        },
                
		"writer": {
			
			"name": "mysqlwriter",             
			"parameter": {
            "column": [ 
"CRT_TM"
			],
                        
			"connection": [{
                             "jdbcUrl": "xxxx",
                             "table": ["xxxx"]
                            }],
            "preSql": [],
            "session": [ "set session sql_mode='ANSI'"],
            "username": "xx",
			"password": "",
            "writeMode": "insert"
                    }
                }
      } 
    ]
,
    "setting": {

      "errorLimit": {

        "percentage": 0,

        "record": 0

      },

      "speed": {

        "channel": 1

      }

    }
  }
}
学新通

问题描述

例如要获取近两天的mongo中的数据作为增量数据写入到mysql中。
mongodbreader的JSON中的parameter中没有像mysql、Oracle一样的querySQL,可以通过写SQL来限制日期时间来获取数据。

"reader": {

          "name": "oraclereader",

          "parameter": {

            "connection": [

              {

                "jdbcUrl": [

                  "xxx"

                ],

                "querySql": [
"
SELECT
CASE 
WHEN A.MZZY_FLAG='B'
THEN A.JZ_LSH ELSE NULL
END AS INHOS_NO ,--- 住院号
CASE 
WHEN A.MZZY_FLAG='M'
THEN A.JZ_LSH ELSE NULL
END AS OTPT_EMG_NO ,--- 门(急)诊号
A.PAT_ID AS PID ,--- 患者ID
'' AS MDCR_PAY_WAY_CD ,--- 医保支付方式代码
'' AS MDCR_PAY_WAY_NM ,--- 医保支付方式名称
A.TCJJ_ZF  AS MDCR_OVRL_PLNG_PAY_FEE ,--- 医保统筹基金支付金额
A.DBJZ_ZF  AS CRTCL_ILLNS_INSRNC_FEE ,--- 大病保险金额
'' AS MDC_ASSTS_FEE ,--- 医疗救助金额
A.GWYBZ_ZF AS CVL_SVNT_MDC_SBSDY_FEE ,--- 公务员医疗补助金额
'' AS BIG_SPLMT_FEE ,--- 大额补充金额
'' AS MNFCT_SPLMT_FEE ,--- 企业补充金额
(A.FUND_PAY_SUMAMT-A.TCJJ_ZF-A.DBJZ_ZF-A.GWYBZ_ZF) AS OTHR_PAY_FEE ,--- 其他基金支付金额
A.ZL_JE AS SLF_PAY ,--- 个人自付
A.ZF_JE AS SLF_FEE ,--- 个人自费
'' AS ACNT_PAY ,--- 个人账户支付
A.GRXJ_ZF AS CSH_PAY ,--- 个人现金支付
-- A.FY_ZE,--总额
-- A.BXJE,--报销金额
A.JS_RQ AS FEE_STLMT_DT ,--- 费用结算日期
    A.CJSJ AS CRT_TM, -- 创建时间
    A.CJR AS CRT_STFF_CD, -- 创建职工工号
    (SELECT XM FROM DICTMANAGE.DICT_EMPE WHERE KEYNO = A.CJR AND ROWNUM = 1) AS CRT_STFF_NM, -- 创建职工姓名
    A.ZHGXSJ AS UPD_DT, -- 更新日期
    A.ZHGXR AS UPD_STFF_CD, -- 更新职工工号
    (SELECT XM FROM DICTMANAGE.DICT_EMPE WHERE KEYNO = A.ZHGXR AND ROWNUM = 1) AS UPD_STFF_NM, -- 更新职工姓名
CASE WHEN A.MISTATUS IN ('MI_DEL')  THEN '1' ELSE '0' END AS INVLD_FLG --- 作废标识
-- A.MISTATUS,--(医保状态PAY_OK,付款成功,RECEDE_OK退款成功,CANCEL取消医保,MI_ERROR医保出错,RECEDE_ERROR退费出错,NEW新的记录,MI_COST医保收费,MI_OK医保确认提交,MI_DEL数据删除)
-- CASE WHEN A.MISTATUS ='PAY_OK'  THEN '付款成功' 
-- WHEN A.MISTATUS ='RECEDE_OK'  THEN '退款成功' 
-- WHEN A.MISTATUS ='CANCEL'  THEN '取消医保' 
-- WHEN A.MISTATUS ='MI_ERROR'  THEN '医保出错' 
-- WHEN A.MISTATUS ='RECEDE_ERROR'  THEN '退费出错' 
-- WHEN A.MISTATUS ='NEW'  THEN '新的记录' 
-- WHEN A.MISTATUS ='MI_COST'  THEN '医保收费' 
-- WHEN A.MISTATUS ='MI_OK'  THEN '医保确认提交' 
-- WHEN A.MISTATUS ='MI_DEL'  THEN '数据删除' 
-- END AS MISTATUS_NM
FROM HIS.MI_TRANS_INFO A
WHERE A.PAT_ID IS NOT NULL 
"
]

              }

            ],

            "username": "xxx",

            "password": "xxx",

            "where": "",

            "splitPk": "PRM_KEY",

            "fetchSize": 512,
          }

        }
学新通

原因分析:

参考DATAX的阅读文档中,文章有提到这样的一个参数。

学新通

解决方案:

使用query参数设定时间范围:
“dateNum”:-2,
“dateType”:“day”,
“query”:“{ “CRT_TM”:{“KaTeX parse error: Expected group as argument to '\"' at end of input: gte\":{\"date”:”%{nextDate}“}}}”,

具体实现如下:

"content": [

      {

        "reader": {

          "name": "mongodbreader",

          "parameter": {

      "column":[
   {
    "name":"CRT_TM",
    "type":"string"
   }
      ],
						"dateNum":-2,
                        "dateType":"day",
                        "query":"{ \"CRT_TM\":{\"$gte\":{\"$date\":\"%{nextDate}\"}}}",

          }
        },
                
		"writer": {
			
			"name": "mysqlwriter",             
			"parameter": {        
            "writeMode": "replace"
                    }
                }
      } 
    ]
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgabifb
系列文章
更多 icon
同类精品
更多 icon
继续加载