diff --git a/L1/pipelines/csst-msc-l1-mbi/message-router/messageRoute.py b/L1/pipelines/csst-msc-l1-mbi/message-router/messageRoute.py index 56390681a5fe8f1be51b59d526241b7463188b1e..435ebe49bfa8027beb12e6c9b4c65c9cb192ce89 100644 --- a/L1/pipelines/csst-msc-l1-mbi/message-router/messageRoute.py +++ b/L1/pipelines/csst-msc-l1-mbi/message-router/messageRoute.py @@ -99,19 +99,34 @@ class messageRoute(): def sendmsg(self,dag_run_id,job,message): #执行解包操作 print("sendmsg dag_run_id is "+dag_run_id) + messageRoute.append_dagrunid(dag_run_id) message = "'"+message+"'" - command = f"scalebox task add --header dag_run_id={dag_run_id} --upsert --sink-job={job} {message}" + command = f"scalebox task add --header dag_run_id={dag_run_id} --header repeatable=yes --upsert --sink-job={job} {message}" print("command : "+command) result=subprocess.run(command, shell=True) if result.returncode == 0: print(f"send message {message} to {job}") + # dag_run_id记入/work/extra-attributes.txt print("命令执行成功") return result.returncode else: print(f"命令执行失败,返回码为: {result.returncode}") return result.returncode + @classmethod + def append_dagrunid(self,dag_run_id): + file_path='/work/extra-attributes.txt' + content_to_append=f"dag_run_id:{dag_run_id}\n" + # 以追加模式打开文件 + try: + with open(file_path, 'a', encoding='utf-8') as file: + # 追加内容 + file.write(content_to_append) + print(f"追加完成,dag_run_id : {dag_run_id}") + except IOError as e: + print(f"写入文件/work/extra-attributes.txt时发生错误:{e}") + if __name__ == '__main__': parameter = sys.argv message=parameter[1]