#!/usr/bin/python3 #-*- coding:utf-8 -*- import sys import json import subprocess import yaml class messageRoute(): def __init__(self): self.message = "" self.headers = "" def route_all(self,message,headers): if headers =="null" or "from_job" not in headers: print("received redis-cli message") headersstr = json.loads(headers) dag_run_id=headersstr["dag_run_id"] messageRoute.appready(dag_run_id,message) else: try: headersstr = json.loads(headers) from_job=headersstr["from_job"] #from_ip=headersstr["from_ip"] dag_run_id=headersstr["dag_run_id"] print("from_job :"+from_job+" dag_run_id :"+dag_run_id) messageRoute.sendsinkjobs(dag_run_id,from_job,message) except json.JSONDecodeError as e: print("Invalid JSON format in headers:", e) @classmethod def appready(self,dag_run_id,message): #解析对应的DAG文件 with open('/dag-yaml/csst-msc-l1-mbi.yml', 'r', encoding='utf-8') as file: config = yaml.safe_load(file) # 打印解析后的数据 #print(config) # 检索yaml文件中tasks下有多少模块 key_to_check = 'tasks' subkey_count = 0 if key_to_check in config and isinstance(config[key_to_check], dict): subkey_count = len(config[key_to_check]) print("The key "+key_to_check+" has "+str(subkey_count)+" subkeys.") if subkey_count == 0: print("DAG file is not correct.") else: for i in range(subkey_count): subkey = "task"+str(i+1) upstream_tasks = config['tasks'][subkey].get('upstream_tasks') # 不包含upstream_tasks的模块为流水线的第一个接收消息的模块 if upstream_tasks is None: sink_job=config['tasks'][subkey].get('image') print("The header job is "+sink_job) messageRoute.sendmsg(dag_run_id,sink_job,message) @classmethod def sendsinkjobs(self,dag_run_id,from_job,message): with open('/dag-yaml/csst-msc-l1-mbi.yml', 'r') as file: config = yaml.safe_load(file) # 检索yaml文件中tasks下有多少模块 key_to_check = 'tasks' subkey_count = 0 if key_to_check in config and isinstance(config[key_to_check], dict): subkey_count = len(config[key_to_check]) print("The key "+key_to_check+" has "+str(subkey_count)+" subkeys. dag_run_id is "+dag_run_id) if subkey_count == 0: print("DAG file is not correct.") return 104 #DAG文件异常 # 检索from_job在DAG中的task标签值 for i in range(subkey_count): subkey = "task"+str(i+1) if(from_job == config[key_to_check][subkey].get('image')): target_value = subkey for i in range(subkey_count): subkey = "task"+str(i+1) upstream_tasks = config[key_to_check][subkey].get('upstream_tasks') # 不包含upstream_tasks的模块为流水线的第一个接收消息的模块 if upstream_tasks is None: continue # 包含upstream_tasks的模块,检索其值是否含有指定模块 else: if target_value in upstream_tasks: sink_job=config[key_to_check][subkey].get('image') messageRoute.sendmsg(dag_run_id,sink_job,message) @classmethod def sendmsg(self,dag_run_id,job,message): #执行解包操作 print("sendmsg dag_run_id is "+dag_run_id) messageRoute.append_dagrunid(dag_run_id) message = "'"+message+"'" command = f"scalebox task add --header dag_run_id={dag_run_id} --header repeatable=yes --upsert --sink-job={job} {message}" print("command : "+command) result=subprocess.run(command, shell=True) if result.returncode == 0: print(f"send message {message} to {job}") # dag_run_id记入/work/extra-attributes.txt print("命令执行成功") return result.returncode else: print(f"命令执行失败,返回码为: {result.returncode}") return result.returncode @classmethod def append_dagrunid(self,dag_run_id): file_path='/work/extra-attributes.txt' content_to_append=f"dag_run_id:{dag_run_id}\n" # 以追加模式打开文件 try: with open(file_path, 'a', encoding='utf-8') as file: # 追加内容 file.write(content_to_append) print(f"追加完成,dag_run_id : {dag_run_id}") except IOError as e: print(f"写入文件/work/extra-attributes.txt时发生错误:{e}") if __name__ == '__main__': parameter = sys.argv message=parameter[1] headers=parameter[2] print('message '+message) print('headers '+headers) #如何接收到headers w=messageRoute() w.route_all(message,headers)