Commit d61800aa authored by Zhang Xiaoli's avatar Zhang Xiaoli
Browse files

header增加dag_run_id字段

parent 5ca753ea
...@@ -14,19 +14,22 @@ class messageRoute(): ...@@ -14,19 +14,22 @@ class messageRoute():
if headers =="null" or "from_job" not in headers: if headers =="null" or "from_job" not in headers:
print("received redis-cli message") print("received redis-cli message")
messageRoute.appready(message) headersstr = json.loads(headers)
dag_run_id=headersstr["dag_run_id"]
messageRoute.appready(dag_run_id,message)
else: else:
try: try:
headersstr = json.loads(headers) headersstr = json.loads(headers)
from_job=headersstr["from_job"] from_job=headersstr["from_job"]
#from_ip=headersstr["from_ip"] #from_ip=headersstr["from_ip"]
print("from_job :"+from_job) dag_run_id=headersstr["dag_run_id"]
messageRoute.sendsinkjobs(from_job,message) print("from_job :"+from_job+" dag_run_id :"+dag_run_id)
messageRoute.sendsinkjobs(dag_run_id,from_job,message)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
print("Invalid JSON format in headers:", e) print("Invalid JSON format in headers:", e)
@classmethod @classmethod
def appready(self,message): def appready(self,dag_run_id,message):
#解析对应的DAG文件 #解析对应的DAG文件
with open('/dag-yaml/csst-msc-l1-mbi.yml', 'r', encoding='utf-8') as file: with open('/dag-yaml/csst-msc-l1-mbi.yml', 'r', encoding='utf-8') as file:
config = yaml.safe_load(file) config = yaml.safe_load(file)
...@@ -55,10 +58,10 @@ class messageRoute(): ...@@ -55,10 +58,10 @@ class messageRoute():
if upstream_tasks is None: if upstream_tasks is None:
sink_job=config['tasks'][subkey].get('image') sink_job=config['tasks'][subkey].get('image')
print("The header job is "+sink_job) print("The header job is "+sink_job)
messageRoute.sendmsg(sink_job,message) messageRoute.sendmsg(dag_run_id,sink_job,message)
@classmethod @classmethod
def sendsinkjobs(self,from_job,message): def sendsinkjobs(self,dag_run_id,from_job,message):
with open('/dag-yaml/csst-msc-l1-mbi.yml', 'r') as file: with open('/dag-yaml/csst-msc-l1-mbi.yml', 'r') as file:
config = yaml.safe_load(file) config = yaml.safe_load(file)
...@@ -68,7 +71,7 @@ class messageRoute(): ...@@ -68,7 +71,7 @@ class messageRoute():
if key_to_check in config and isinstance(config[key_to_check], dict): if key_to_check in config and isinstance(config[key_to_check], dict):
subkey_count = len(config[key_to_check]) subkey_count = len(config[key_to_check])
print("The key "+key_to_check+" has "+str(subkey_count)+" subkeys.") print("The key "+key_to_check+" has "+str(subkey_count)+" subkeys. dag_run_id is "+dag_run_id)
if subkey_count == 0: if subkey_count == 0:
print("DAG file is not correct.") print("DAG file is not correct.")
...@@ -92,12 +95,14 @@ class messageRoute(): ...@@ -92,12 +95,14 @@ class messageRoute():
else: else:
if target_value in upstream_tasks: if target_value in upstream_tasks:
sink_job=config[key_to_check][subkey].get('image') sink_job=config[key_to_check][subkey].get('image')
messageRoute.sendmsg(sink_job,message) messageRoute.sendmsg(dag_run_id,sink_job,message)
@classmethod @classmethod
def sendmsg(self,job,message): def sendmsg(self,dag_run_id,job,message):
#执行解包操作 #执行解包操作
command = f"scalebox task add -sink-job={job} {message}" print("sendmsg dag_run_id is "+dag_run_id)
command = f"scalebox task add --header dag_run_id={dag_run_id} --sink-job={job} {message}"
print("command : "+command)
result=subprocess.run(command, shell=True) result=subprocess.run(command, shell=True)
if result.returncode == 0: if result.returncode == 0:
print(f"send message {message} to {job}") print(f"send message {message} to {job}")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment