Commit 13b035ca authored by Zhang Xiaoli's avatar Zhang Xiaoli
Browse files

修改传递的消息为json格式{"obsid":"10109400638761","chipid":"12"}

parent fccf0f85
#!/bin/bash #!/bin/bash
# obsid 10160000000 - 10160000136 message=$1
arr=($(echo $1 | tr "-" " ")) echo "message: "$message
obsid="${arr[0]}"
detector="${arr[1]}"
echo "obsid: "$obsid
echo "detector: "$detector
cd /pipeline/output/ cd /pipeline/output/
python /pipeline/app/run.py $obsid $detector python /pipeline/app/run.py $message
exit_code=$? exit_code=$?
......
#!/bin/bash #!/bin/bash
# obsid 10160000000 - 10160000136 message=$1
arr=($(echo $1 | tr "-" " ")) echo "message: "$message
obsid="${arr[0]}"
detector="${arr[1]}"
echo "obsid: "$obsid
echo "detector: "$detector
cd /pipeline/output/ cd /pipeline/output/
python /pipeline/app/run.py $obsid $detector python /pipeline/app/run.py $message
exit_code=$? exit_code=$?
...@@ -39,7 +35,7 @@ fi ...@@ -39,7 +35,7 @@ fi
if [ $exit_code -eq 0 ]; then if [ $exit_code -eq 0 ]; then
echo "finish mbi, start sink-job." echo "finish mbi, start sink-job."
scalebox task add --header dag_run_id=${dag_run_id} --upsert "$1" scalebox task add --header dag_run_id=${dag_run_id} --upsert ${message}
# scalebox task add $1 # scalebox task add $1
else else
echo "finish mbi, exit_code: $exit_code" echo "finish mbi, exit_code: $exit_code"
......
#!/bin/bash #!/bin/bash
# obsid 10160000000 - 10160000136 message=$1
arr=($(echo $1 | tr "-" " ")) echo "message: "$message
obsid="${arr[0]}"
detector="${arr[1]}"
echo "obsid: "$obsid
echo "detector: "$detector
cd /pipeline/output/ cd /pipeline/output/
python /pipeline/app/run.py $obsid $detector python /pipeline/app/run.py $message
exit_code=$? exit_code=$?
...@@ -39,7 +35,8 @@ else ...@@ -39,7 +35,8 @@ else
fi fi
if [ $exit_code -eq 0 ]; then if [ $exit_code -eq 0 ]; then
scalebox task add --header dag_run_id=${dag_run_id} --upsert "$1" scalebox task add --header dag_run_id=${dag_run_id} --upsert ${message}
echo "finish qc0, start sink-job." echo "finish qc0, start sink-job."
else else
echo "finish qc0, exit_code: $exit_code" echo "finish qc0, exit_code: $exit_code"
......
csst-msc-l1-mbi 9 csst-msc-l1-mbi 1
\ No newline at end of file \ No newline at end of file
...@@ -27,24 +27,16 @@ do ...@@ -27,24 +27,16 @@ do
else else
echo "$dag_id not found in appconfig.txt" echo "$dag_id not found in appconfig.txt"
code = 103 code = 103
# exit 103
fi fi
# 提取dag_run_id # 提取dag_run_id
dag_run_id=$(echo "$msg" | jq -r '.parameters.dag_run_id') dag_run_id=$(echo "$msg" | jq -r '.parameters.dag_run_id')
echo "dag_run_id : "$dag_run_id echo "dag_run_id : "$dag_run_id
# 提取obsid message=$(echo "$msg" | jq -c '.parameters.message')
obsid=$(echo "$msg" | jq -r '.parameters.message.obsid')
echo "obsid :"$obsid # 提取message
echo "$message" >> "msgs.txt"
# 提取chipid
chipid=$(echo "$msg" | jq -r '.parameters.message.chipid')
echo "chipid :"$chipid
message=$obsid"-"$chipid
echo "message :"$message
#send-message $obsid #send-message $obsid
job_name="message-router-csst" job_name="message-router-csst"
......
redis-cli -h 10.3.10.28 -a 123456 lpush csst_data_list_zxl "{\"dag_id\": \"csst-msc-l1-mbi\", \"parameters\": {\"dag_run_id\": \"202411111841239406\", \"message\": {\"obsid\": \"11009101682009\", \"chipid\": \"16\"}}}" redis-cli -h 10.3.10.28 -a 123456 lpush csst_data_list_zxl "{\"dag_id\": \"csst-msc-l1-mbi\", \"parameters\": {\"dag_run_id\": \"202411111841239406\", \"message\": {\"obsid\": \"11009101682009\", \"chipid\": \"16\"}}}"
\ No newline at end of file
scalebox task add --app-id 16 --header dag_run_id="202411258348645214" --upsert --sink-job "message-router-csst" "11009101682009-09"
\ No newline at end of file
...@@ -51,8 +51,6 @@ class messageRoute(): ...@@ -51,8 +51,6 @@ class messageRoute():
subkey = "task"+str(i+1) subkey = "task"+str(i+1)
upstream_tasks = config['tasks'][subkey].get('upstream_tasks') upstream_tasks = config['tasks'][subkey].get('upstream_tasks')
# print("subkey "+subkey)
# 不包含upstream_tasks的模块为流水线的第一个接收消息的模块 # 不包含upstream_tasks的模块为流水线的第一个接收消息的模块
if upstream_tasks is None: if upstream_tasks is None:
...@@ -101,6 +99,8 @@ class messageRoute(): ...@@ -101,6 +99,8 @@ class messageRoute():
def sendmsg(self,dag_run_id,job,message): def sendmsg(self,dag_run_id,job,message):
#执行解包操作 #执行解包操作
print("sendmsg dag_run_id is "+dag_run_id) print("sendmsg dag_run_id is "+dag_run_id)
message = "'"+message+"'"
command = f"scalebox task add --header dag_run_id={dag_run_id} --upsert --sink-job={job} {message}" command = f"scalebox task add --header dag_run_id={dag_run_id} --upsert --sink-job={job} {message}"
print("command : "+command) print("command : "+command)
result=subprocess.run(command, shell=True) result=subprocess.run(command, shell=True)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment