#!/usr/bin/python3 #-*- coding:utf-8 -*- import sys import json import subprocess import yaml class messageRoute(): def __init__(self): self.message = "" self.headers = "" def route_all(self,message,headers): if headers =="null" or "from_job" not in headers: print("received redis-cli message") messageRoute.appready(message) else: try: headersstr = json.loads(headers) from_job=headersstr["from_job"] #from_ip=headersstr["from_ip"] print("from_job :"+from_job) messageRoute.sendsinkjobs(from_job,message) except json.JSONDecodeError as e: print("Invalid JSON format in headers:", e) @classmethod def appready(self,message): #解析对应的DAG文件 with open('/dag-yaml/csst-msc-l1-mbi.yml', 'r', encoding='utf-8') as file: config = yaml.safe_load(file) # 打印解析后的数据 #print(config) # 检索yaml文件中tasks下有多少模块 key_to_check = 'tasks' subkey_count = 0 if key_to_check in config and isinstance(config[key_to_check], dict): subkey_count = len(config[key_to_check]) print("The key "+key_to_check+" has "+str(subkey_count)+" subkeys.") if subkey_count == 0: print("DAG file is not correct.") else: for i in range(subkey_count): subkey = "task"+str(i+1) upstream_tasks = config['tasks'][subkey].get('upstream_tasks') # print("subkey "+subkey) # 不包含upstream_tasks的模块为流水线的第一个接收消息的模块 if upstream_tasks is None: sink_job=config['tasks'][subkey].get('image') print("The header job is "+sink_job) messageRoute.sendmsg(sink_job,message) @classmethod def sendsinkjobs(self,from_job,message): with open('/dag-yaml/csst-msc-l1-mbi.yml', 'r') as file: config = yaml.safe_load(file) # 检索yaml文件中tasks下有多少模块 key_to_check = 'tasks' subkey_count = 0 if key_to_check in config and isinstance(config[key_to_check], dict): subkey_count = len(config[key_to_check]) print("The key "+key_to_check+" has "+str(subkey_count)+" subkeys.") if subkey_count == 0: print("DAG file is not correct.") return 104 #DAG文件异常 # 检索from_job在DAG中的task标签值 for i in range(subkey_count): subkey = "task"+str(i+1) if(from_job == config[key_to_check][subkey].get('image')): target_value = subkey for i in range(subkey_count): subkey = "task"+str(i+1) upstream_tasks = config[key_to_check][subkey].get('upstream_tasks') # 不包含upstream_tasks的模块为流水线的第一个接收消息的模块 if upstream_tasks is None: continue # 包含upstream_tasks的模块,检索其值是否含有指定模块 else: if target_value in upstream_tasks: sink_job=config[key_to_check][subkey].get('image') messageRoute.sendmsg(sink_job,message) @classmethod def sendmsg(self,job,message): #执行解包操作 command = f"scalebox task add -sink-job={job} {message}" result=subprocess.run(command, shell=True) if result.returncode == 0: print(f"send message {message} to {job}") print("命令执行成功") return result.returncode else: print(f"命令执行失败,返回码为: {result.returncode}") return result.returncode if __name__ == '__main__': parameter = sys.argv message=parameter[1] headers=parameter[2] print('message '+message) print('headers '+headers) #如何接收到headers w=messageRoute() w.route_all(message,headers)