Commit 1f1cdb42 authored by Zhang Xiaoli's avatar Zhang Xiaoli
Browse files

修正消息中data_list过长的问题,更新公有云中代码。

parent 4a9bac26
...@@ -2,7 +2,20 @@ ...@@ -2,7 +2,20 @@
message=$1 message=$1
echo "message: "$message echo "message: "$message
headers=$(echo "$2" | sed 's/\\//g')
echo "$headers" | jq '.'
echo "headers: $headers"
has_data_list=$(echo "$headers" | jq 'has("data_list")')
if [ "$has_data_list" = "true" ]; then
# 提取 data_list 字段的值
data_list=$(echo "$headers" | jq -r '.data_list')
fixed_data_list=$(echo "$data_list" | sed 's/\([a-f0-9]\{24\}\)/"\1"/g')
# 将 data_list 重新插入回 message
message=$(echo "$message" | jq --argjson dl "$fixed_data_list" '. + {data_list: $dl}')
fi
cd /pipeline/output/ cd /pipeline/output/
python /pipeline/app/run.py $message python /pipeline/app/run.py $message
......
IMAGE_NAME:=csu-harbor.csst.nao:10443/cnic/csst-cpic-l1 IMAGE_NAME:=csu-harbor.csst.nao:10443/cnic/csst-cpic-l1
IMAGE_PATH:=/nfs/tmp/scalebox-images
all: build push dist all: build push dist
build: build:
docker pull csu-harbor.csst.nao:10443/csst/csst-cpic-l1 docker pull csu-harbor.csst.nao:10443/csst/csst-cpic-l1
......
...@@ -2,7 +2,20 @@ ...@@ -2,7 +2,20 @@
message=$1 message=$1
echo "message: "$message echo "message: "$message
headers=$(echo "$2" | sed 's/\\//g')
echo "$headers" | jq '.'
echo "headers: $headers"
has_data_list=$(echo "$headers" | jq 'has("data_list")')
if [ "$has_data_list" = "true" ]; then
# 提取 data_list 字段的值
data_list=$(echo "$headers" | jq -r '.data_list')
fixed_data_list=$(echo "$data_list" | sed 's/\([a-f0-9]\{24\}\)/"\1"/g')
# 将 data_list 重新插入回 message
message=$(echo "$message" | jq --argjson dl "$fixed_data_list" '. + {data_list: $dl}')
fi
cd /pipeline/output/ cd /pipeline/output/
python /pipeline/app/run.py $message python /pipeline/app/run.py $message
......
...@@ -2,6 +2,22 @@ ...@@ -2,6 +2,22 @@
message=$1 message=$1
echo "message: "$message echo "message: "$message
#headers=$2
headers=$(echo "$2" | sed 's/\\//g')
# 步骤2: 添加引号(如果引号缺失)
#fixed_msg=$(echo "$step1" | sed 's/\([a-f0-9]\{24\}\)/"\1"/g')
echo "$headers" | jq '.'
echo "headers: $headers"
has_data_list=$(echo "$headers" | jq 'has("data_list")')
if [ "$has_data_list" = "true" ]; then
# 提取 data_list 字段的值
data_list=$(echo "$headers" | jq -r '.data_list')
fixed_data_list=$(echo "$data_list" | sed 's/\([a-f0-9]\{24\}\)/"\1"/g')
# 将 data_list 重新插入回 message
message=$(echo "$message" | jq --argjson dl "$fixed_data_list" '. + {data_list: $dl}')
fi
cd /pipeline/output/ cd /pipeline/output/
python /pipeline/app/run.py $message python /pipeline/app/run.py $message
...@@ -38,7 +54,7 @@ echo "dag_run_id:$dag_run_id" >> /work/extra-attributes.txt ...@@ -38,7 +54,7 @@ echo "dag_run_id:$dag_run_id" >> /work/extra-attributes.txt
if [ $exit_code -eq 0 ]; then if [ $exit_code -eq 0 ]; then
# scalebox task add --header dag_run_id=${dag_run_id} --header repeatable=yes --upsert ${message} # scalebox task add --header dag_run_id=${dag_run_id} --header repeatable=yes --upsert ${message}
echo "$1" > ${WORK_DIR}/messages.txt # echo "$1" > ${WORK_DIR}/messages.txt
echo "finish hstdm-l1." echo "finish hstdm-l1."
else else
echo "finish hstdm-l1, exit_code: $exit_code" echo "finish hstdm-l1, exit_code: $exit_code"
......
...@@ -2,7 +2,20 @@ ...@@ -2,7 +2,20 @@
message=$1 message=$1
echo "message: "$message echo "message: "$message
headers=$(echo "$2" | sed 's/\\//g')
echo "$headers" | jq '.'
echo "headers: $headers"
has_data_list=$(echo "$headers" | jq 'has("data_list")')
if [ "$has_data_list" = "true" ]; then
# 提取 data_list 字段的值
data_list=$(echo "$headers" | jq -r '.data_list')
fixed_data_list=$(echo "$data_list" | sed 's/\([a-f0-9]\{24\}\)/"\1"/g')
# 将 data_list 重新插入回 message
message=$(echo "$message" | jq --argjson dl "$fixed_data_list" '. + {data_list: $dl}')
fi
cd /pipeline/output/ cd /pipeline/output/
python /pipeline/app/run.py $message python /pipeline/app/run.py $message
...@@ -36,8 +49,24 @@ fi ...@@ -36,8 +49,24 @@ fi
echo "dag_run_id:$dag_run_id" >> /work/extra-attributes.txt echo "dag_run_id:$dag_run_id" >> /work/extra-attributes.txt
pattern='"sorted_tag":"([^"]+)"'
if [[ $headers =~ $pattern ]]; then
sorted_tag="${BASH_REMATCH[1]}"
echo "sorted_tag: $sorted_tag"
else
# no sorted_tag in json
sorted_tag=1
fi
if [ $exit_code -eq 0 ]; then if [ $exit_code -eq 0 ]; then
# scalebox task add --header dag_run_id=${dag_run_id} --header repeatable=yes --upsert ${message} # scalebox task add --header dag_run_id=${dag_run_id} --header repeatable=yes --upsert ${message}
if [ "$has_data_list" = "true" ]; then
# 提取 data_list 字段的值
strdatalist=$(echo "$fixed_data_list" | tr -d '[:space:]')
scalebox task add --header sorted_tag=${sorted_tag} --header dag_run_id=${dag_run_id} --header data_list=${strdatalist} --header repeatable=yes --upsert $1
else
scalebox task add --header sorted_tag=${sorted_tag} --header dag_run_id=${dag_run_id} --header repeatable=yes --upsert ${message}
fi
echo "finish ifs-l1-qc0." echo "finish ifs-l1-qc0."
else else
echo "finish ifs-l1-qc0, exit_code: $exit_code" echo "finish ifs-l1-qc0, exit_code: $exit_code"
......
...@@ -2,7 +2,20 @@ ...@@ -2,7 +2,20 @@
message=$1 message=$1
echo "message: "$message echo "message: "$message
headers=$(echo "$2" | sed 's/\\//g')
echo "$headers" | jq '.'
echo "headers: $headers"
has_data_list=$(echo "$headers" | jq 'has("data_list")')
if [ "$has_data_list" = "true" ]; then
# 提取 data_list 字段的值
data_list=$(echo "$headers" | jq -r '.data_list')
fixed_data_list=$(echo "$data_list" | sed 's/\([a-f0-9]\{24\}\)/"\1"/g')
# 将 data_list 重新插入回 message
message=$(echo "$message" | jq --argjson dl "$fixed_data_list" '. + {data_list: $dl}')
fi
cd /pipeline/output/ cd /pipeline/output/
python /pipeline/app/run.py $message python /pipeline/app/run.py $message
# python l1_pipeline_script_ifs_rss_simdata.py $obsid # python l1_pipeline_script_ifs_rss_simdata.py $obsid
...@@ -33,6 +46,14 @@ else ...@@ -33,6 +46,14 @@ else
# no dag_run_id in json # no dag_run_id in json
dag_run_id="" dag_run_id=""
fi fi
pattern='"sorted_tag":"([^"]+)"'
if [[ $headers =~ $pattern ]]; then
sorted_tag="${BASH_REMATCH[1]}"
echo "sorted_tag: $sorted_tag"
else
# no sorted_tag in json
sorted_tag=1
fi
echo "dag_run_id:$dag_run_id" >> /work/extra-attributes.txt echo "dag_run_id:$dag_run_id" >> /work/extra-attributes.txt
......
...@@ -2,7 +2,20 @@ ...@@ -2,7 +2,20 @@
message=$1 message=$1
echo "message: "$message echo "message: "$message
headers=$(echo "$2" | sed 's/\\//g')
echo "$headers" | jq '.'
echo "headers: $headers"
has_data_list=$(echo "$headers" | jq 'has("data_list")')
if [ "$has_data_list" = "true" ]; then
# 提取 data_list 字段的值
data_list=$(echo "$headers" | jq -r '.data_list')
fixed_data_list=$(echo "$data_list" | sed 's/\([a-f0-9]\{24\}\)/"\1"/g')
# 将 data_list 重新插入回 message
message=$(echo "$message" | jq --argjson dl "$fixed_data_list" '. + {data_list: $dl}')
fi
cd /pipeline/output/ cd /pipeline/output/
python /pipeline/app/run.py $message python /pipeline/app/run.py $message
# python l1_pipeline_script_ifs_rss_simdata.py $obsid # python l1_pipeline_script_ifs_rss_simdata.py $obsid
......
...@@ -5,6 +5,20 @@ rm -rf /work/* ...@@ -5,6 +5,20 @@ rm -rf /work/*
message=$1 message=$1
echo "message: "$message echo "message: "$message
headers=$(echo "$2" | sed 's/\\//g')
echo "$headers" | jq '.'
echo "headers: $headers"
has_data_list=$(echo "$headers" | jq 'has("data_list")')
if [ "$has_data_list" = "true" ]; then
# 提取 data_list 字段的值
data_list=$(echo "$headers" | jq -r '.data_list')
fixed_data_list=$(echo "$data_list" | sed 's/\([a-f0-9]\{24\}\)/"\1"/g')
# 将 data_list 重新插入回 message
message=$(echo "$message" | jq --argjson dl "$fixed_data_list" '. + {data_list: $dl}')
fi
cd /pipeline/output/ cd /pipeline/output/
python /pipeline/app/run.py $message python /pipeline/app/run.py $message
......
...@@ -6,6 +6,21 @@ rm -rf /work/* ...@@ -6,6 +6,21 @@ rm -rf /work/*
message=$1 message=$1
echo "message: "$message echo "message: "$message
headers=$(echo "$2" | sed 's/\\//g')
echo "$headers" | jq '.'
echo "headers: $headers"
has_data_list=$(echo "$headers" | jq 'has("data_list")')
if [ "$has_data_list" = "true" ]; then
# 提取 data_list 字段的值
data_list=$(echo "$headers" | jq -r '.data_list')
fixed_data_list=$(echo "$data_list" | sed 's/\([a-f0-9]\{24\}\)/"\1"/g')
# 将 data_list 重新插入回 message
message=$(echo "$message" | jq --argjson dl "$fixed_data_list" '. + {data_list: $dl}')
fi
cd /pipeline/output/ cd /pipeline/output/
python /pipeline/app/run.py $message python /pipeline/app/run.py $message
...@@ -48,9 +63,14 @@ else ...@@ -48,9 +63,14 @@ else
fi fi
if [ $exit_code -eq 0 ]; then if [ $exit_code -eq 0 ]; then
echo "finish ast, start sink-job." if [ "$has_data_list" = "true" ]; then
# 提取 data_list 字段的值
strdatalist=$(echo "$fixed_data_list" | tr -d '[:space:]')
scalebox task add --header sorted_tag=${sorted_tag} --header dag_run_id=${dag_run_id} --header data_list=${strdatalist} --header repeatable=yes --upsert $1
else
scalebox task add --header sorted_tag=${sorted_tag} --header dag_run_id=${dag_run_id} --header repeatable=yes --upsert ${message} scalebox task add --header sorted_tag=${sorted_tag} --header dag_run_id=${dag_run_id} --header repeatable=yes --upsert ${message}
# scalebox task add $1 fi
echo "finish ast, start sink-job."
else else
echo "finish ast, exit_code: $exit_code" echo "finish ast, exit_code: $exit_code"
fi fi
......
...@@ -5,7 +5,20 @@ rm -rf /work/* ...@@ -5,7 +5,20 @@ rm -rf /work/*
message=$1 message=$1
echo "message: "$message echo "message: "$message
headers=$(echo "$2" | sed 's/\\//g')
echo "$headers" | jq '.'
echo "headers: $headers"
has_data_list=$(echo "$headers" | jq 'has("data_list")')
if [ "$has_data_list" = "true" ]; then
# 提取 data_list 字段的值
data_list=$(echo "$headers" | jq -r '.data_list')
fixed_data_list=$(echo "$data_list" | sed 's/\([a-f0-9]\{24\}\)/"\1"/g')
# 将 data_list 重新插入回 message
message=$(echo "$message" | jq --argjson dl "$fixed_data_list" '. + {data_list: $dl}')
fi
cd /pipeline/output/ cd /pipeline/output/
python /pipeline/app/run.py $message python /pipeline/app/run.py $message
......
FROM zjlab-harbor.csst.nao:10443/csst/csst-msc-l1-mbi FROM csu-harbor.csst.nao:10443/csst/csst-msc-l1-mbi
USER root USER root
COPY run.sh /app/bin/ COPY run.sh /app/bin/
......
IMAGE_NAME:=csu-harbor.csst.nao:10443/cnic/csst-msc-l1-mbi IMAGE_NAME:=csu-harbor.csst.nao:10443/cnic/csst-msc-l1-mbi
IMAGE_PATH:=/mnt/tmp/scalebox-images
all: build push dist all: build push dist
build: build:
docker pull zjlab-harbor.csst.nao:10443/csst/csst-msc-l1-mbi docker pull csu-harbor.csst.nao:10443/csst/csst-msc-l1-mbi
docker build --network=host -t $(IMAGE_NAME) . docker build --network=host -t $(IMAGE_NAME) .
push: push:
docker push $(IMAGE_NAME) docker push $(IMAGE_NAME)
......
...@@ -5,6 +5,20 @@ rm -rf /work/* ...@@ -5,6 +5,20 @@ rm -rf /work/*
message=$1 message=$1
echo "message: "$message echo "message: "$message
headers=$(echo "$2" | sed 's/\\//g')
echo "$headers" | jq '.'
echo "headers: $headers"
has_data_list=$(echo "$headers" | jq 'has("data_list")')
if [ "$has_data_list" = "true" ]; then
# 提取 data_list 字段的值
data_list=$(echo "$headers" | jq -r '.data_list')
fixed_data_list=$(echo "$data_list" | sed 's/\([a-f0-9]\{24\}\)/"\1"/g')
# 将 data_list 重新插入回 message
message=$(echo "$message" | jq --argjson dl "$fixed_data_list" '. + {data_list: $dl}')
fi
cd /pipeline/output/ cd /pipeline/output/
python /pipeline/app/run.py $message python /pipeline/app/run.py $message
...@@ -48,9 +62,14 @@ else ...@@ -48,9 +62,14 @@ else
fi fi
if [ $exit_code -eq 0 ]; then if [ $exit_code -eq 0 ]; then
echo "finish mbi, start sink-job." if [ "$has_data_list" = "true" ]; then
# 提取 data_list 字段的值
strdatalist=$(echo "$fixed_data_list" | tr -d '[:space:]')
scalebox task add --header sorted_tag=${sorted_tag} --header dag_run_id=${dag_run_id} --header data_list=${strdatalist} --header repeatable=yes --upsert $1
else
scalebox task add --header sorted_tag=${sorted_tag} --header dag_run_id=${dag_run_id} --header repeatable=yes --upsert ${message} scalebox task add --header sorted_tag=${sorted_tag} --header dag_run_id=${dag_run_id} --header repeatable=yes --upsert ${message}
# scalebox task add $1 fi
echo "finish mbi, start sink-job."
else else
echo "finish mbi, exit_code: $exit_code" echo "finish mbi, exit_code: $exit_code"
fi fi
......
...@@ -5,7 +5,20 @@ rm -rf /work/* ...@@ -5,7 +5,20 @@ rm -rf /work/*
message=$1 message=$1
echo "message: "$message echo "message: "$message
headers=$(echo "$2" | sed 's/\\//g')
echo "$headers" | jq '.'
echo "headers: $headers"
has_data_list=$(echo "$headers" | jq 'has("data_list")')
if [ "$has_data_list" = "true" ]; then
# 提取 data_list 字段的值
data_list=$(echo "$headers" | jq -r '.data_list')
fixed_data_list=$(echo "$data_list" | sed 's/\([a-f0-9]\{24\}\)/"\1"/g')
# 将 data_list 重新插入回 message
message=$(echo "$message" | jq --argjson dl "$fixed_data_list" '. + {data_list: $dl}')
fi
cd /pipeline/output/ cd /pipeline/output/
python /pipeline/app/run.py $message python /pipeline/app/run.py $message
...@@ -34,14 +47,6 @@ else ...@@ -34,14 +47,6 @@ else
dag_run_id="" dag_run_id=""
fi fi
echo "dag_run_id:$dag_run_id" >> /work/extra-attributes.txt echo "dag_run_id:$dag_run_id" >> /work/extra-attributes.txt
# pattern='"sorted_tag":"([^"]+)"'
# if [[ $headers =~ $pattern ]]; then
# sorted_tag="${BASH_REMATCH[1]}"
# echo "sorted_tag: $sorted_tag"
# else
# # no sorted_tag in json
# sorted_tag=1
# fi
if [ $exit_code -eq 0 ]; then if [ $exit_code -eq 0 ]; then
echo "finish ooc." echo "finish ooc."
......
...@@ -2,7 +2,20 @@ ...@@ -2,7 +2,20 @@
message=$1 message=$1
echo "message: "$message echo "message: "$message
headers=$(echo "$2" | sed 's/\\//g')
echo "$headers" | jq '.'
echo "headers: $headers"
has_data_list=$(echo "$headers" | jq 'has("data_list")')
if [ "$has_data_list" = "true" ]; then
# 提取 data_list 字段的值
data_list=$(echo "$headers" | jq -r '.data_list')
fixed_data_list=$(echo "$data_list" | sed 's/\([a-f0-9]\{24\}\)/"\1"/g')
# 将 data_list 重新插入回 message
message=$(echo "$message" | jq --argjson dl "$fixed_data_list" '. + {data_list: $dl}')
fi
cd /pipeline/output/ cd /pipeline/output/
python /pipeline/app/run.py $message python /pipeline/app/run.py $message
......
...@@ -5,6 +5,20 @@ rm -rf /work/* ...@@ -5,6 +5,20 @@ rm -rf /work/*
message=$1 message=$1
echo "message: "$message echo "message: "$message
headers=$(echo "$2" | sed 's/\\//g')
echo "$headers" | jq '.'
echo "headers: $headers"
has_data_list=$(echo "$headers" | jq 'has("data_list")')
if [ "$has_data_list" = "true" ]; then
# 提取 data_list 字段的值
data_list=$(echo "$headers" | jq -r '.data_list')
fixed_data_list=$(echo "$data_list" | sed 's/\([a-f0-9]\{24\}\)/"\1"/g')
# 将 data_list 重新插入回 message
message=$(echo "$message" | jq --argjson dl "$fixed_data_list" '. + {data_list: $dl}')
fi
cd /pipeline/output/ cd /pipeline/output/
python /pipeline/app/run.py $message python /pipeline/app/run.py $message
......
IMAGE_NAME:=csst/message-router-mbi IMAGE_NAME:=csst/message-router-hstdm
build: build:
docker build --network=host -t $(IMAGE_NAME) . docker build --network=host -t $(IMAGE_NAME) .
......
...@@ -17,12 +17,23 @@ class messageRoute(): ...@@ -17,12 +17,23 @@ class messageRoute():
dag = os.getenv("DAG_NAME", "csst-msc-l1-mbi") dag = os.getenv("DAG_NAME", "csst-msc-l1-mbi")
dagfile = "/dag-yaml/"+dag+".yml" dagfile = "/dag-yaml/"+dag+".yml"
if headers =="null" or "from_job" not in headers: fixed_headers = headers.replace('\\', '')
fixed_headers = fixed_headers.replace('"["', '["').replace('"]"', '"]')
print(f"fixed_headers: {headers}")
# 改进的JSON解析逻辑
try:
headersstr = json.loads(fixed_headers)
except json.JSONDecodeError as e:
print(f"Invalid JSON format in headers: {e}, headers: {fixed_headers}")
return
if headers =="null" or "from_job" not in headersstr:
print("received redis-cli message") print("received redis-cli message")
headersstr = json.loads(headers)
dag_run_id=headersstr["dag_run_id"] dag_run_id=headersstr["dag_run_id"]
sorted_tag=headersstr["sorted_tag"] sorted_tag=headersstr["sorted_tag"]
messageRoute.appready(dagfile,sorted_tag,dag_run_id,message) data_list=headersstr.get("data_list")
messageRoute.appready(dagfile,sorted_tag,dag_run_id,data_list,message)
else: else:
try: try:
headersstr = json.loads(headers) headersstr = json.loads(headers)
...@@ -30,13 +41,14 @@ class messageRoute(): ...@@ -30,13 +41,14 @@ class messageRoute():
#from_ip=headersstr["from_ip"] #from_ip=headersstr["from_ip"]
dag_run_id=headersstr["dag_run_id"] dag_run_id=headersstr["dag_run_id"]
sorted_tag=headersstr["sorted_tag"] sorted_tag=headersstr["sorted_tag"]
print("from_job :"+from_job+" dag_run_id :"+dag_run_id+" sorted_tag :"+sorted_tag) data_list = headersstr.get("data_list")
messageRoute.sendsinkjobs(dagfile,sorted_tag,dag_run_id,from_job,message) print("from_job :"+from_job+" dag_run_id :"+dag_run_id+" sorted_tag :"+sorted_tag+"data_list :"+data_list)
messageRoute.sendsinkjobs(dagfile,sorted_tag,dag_run_id,data_list,from_job,message)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
print("Invalid JSON format in headers:", e) print("Invalid JSON format in headers:", e)
@classmethod @classmethod
def appready(self,dagfile,sorted_tag,dag_run_id,message): def appready(self,dagfile,sorted_tag,dag_run_id,data_list,message):
#解析对应的DAG文件 #解析对应的DAG文件
with open(dagfile, "r", encoding='utf-8') as f: with open(dagfile, "r", encoding='utf-8') as f:
data = yaml.safe_load(f) data = yaml.safe_load(f)
...@@ -46,10 +58,10 @@ class messageRoute(): ...@@ -46,10 +58,10 @@ class messageRoute():
print(f"任务 '{task['name']}' 没有 dependencies 字段。") print(f"任务 '{task['name']}' 没有 dependencies 字段。")
sink_job=task['image'] sink_job=task['image']
print("The header job is "+sink_job) print("The header job is "+sink_job)
messageRoute.sendmsg(sorted_tag,dag_run_id,sink_job,message) messageRoute.sendmsg(sorted_tag,dag_run_id,data_list,sink_job,message)
@classmethod @classmethod
def sendsinkjobs(self,dagfile,sorted_tag,dag_run_id,from_job,message): def sendsinkjobs(self,dagfile,sorted_tag,dag_run_id,data_list,from_job,message):
#解析对应的DAG文件 #解析对应的DAG文件
#from_job='csst-msc-l1-mbi' #from_job='csst-msc-l1-mbi'
with open(dagfile, "r", encoding='utf-8') as f: with open(dagfile, "r", encoding='utf-8') as f:
...@@ -66,15 +78,23 @@ class messageRoute(): ...@@ -66,15 +78,23 @@ class messageRoute():
if from_job in dependencies: if from_job in dependencies:
sink_job = task.get('image') sink_job = task.get('image')
if sink_job: if sink_job:
messageRoute.sendmsg(sorted_tag,dag_run_id,sink_job,message) messageRoute.sendmsg(sorted_tag,dag_run_id,data_list,sink_job,message)
@classmethod @classmethod
def sendmsg(self,sorted_tag,dag_run_id,job,message): def sendmsg(self,sorted_tag,dag_run_id,data_list,job,message):
#执行解包操作 #执行解包操作
print("sendmsg dag_run_id is "+dag_run_id) print("sendmsg dag_run_id is "+dag_run_id)
messageRoute.append_dagrunid(dag_run_id) messageRoute.append_dagrunid(dag_run_id)
message = "'"+message+"'" message = "'"+message+"'"
if not data_list:
print("data_list 是 None 或空值")
#header中没有data_list参数时,下发消息也不带--header data_list
command = f"scalebox task add --header sorted_tag={sorted_tag} --header dag_run_id={dag_run_id} --header repeatable=yes --upsert --sink-job={job} {message}" command = f"scalebox task add --header sorted_tag={sorted_tag} --header dag_run_id={dag_run_id} --header repeatable=yes --upsert --sink-job={job} {message}"
else:
print("data_list 有值")
command = f"scalebox task add --header sorted_tag={sorted_tag} --header dag_run_id={dag_run_id} --header repeatable=yes --header data_list={data_list} --upsert --sink-job={job} {message}"
print("command : "+command) print("command : "+command)
result=subprocess.run(command, shell=True) result=subprocess.run(command, shell=True)
if result.returncode == 0: if result.returncode == 0:
......
csst-msc-l1-mbi 21 csst-msc-l1-mbi 60
\ No newline at end of file csst-msc-l1-sls 63
csst-msc-l1-qc0 62
csst-cpic-l1 50
csst-cpic-l1-qc0 51
csst-hstdm-l1 52
csst-ifs-l1-qc0 53
csst-ifs-l1-rss 54
csst-mci-l1 55
csst-msc-l1-ooc 61
csst-msc-l1-ast 50
csst-ifs-l1 49
#!/bin/bash #!/bin/bash
# json串 # 设置日志轮转
# "{\"dag_id\": \"csst-msc-l1-mbi\", \"dag_run_id\": \"202411111505581036\", \"message\": {\"obsid\": \"11009101682009\", \"chipid\": \"24\"}}" LOG_FILE="/logs/msgs.txt"
# { MAX_LOG_SIZE=$((100 * 1024 * 1024)) # 100MB
# "priority": 1, MAX_LOG_FILES=5
# "dag_id": "csst-msc-l1-mbi",
# "dag_run_id": "12345", rotate_logs() {
# "dataset": "csst-msc-c9-25sqdeg-v3", if [[ -f "$LOG_FILE" && $(stat -f%z "$LOG_FILE" 2>/dev/null || stat -c%s "$LOG_FILE" 2>/dev/null) -gt $MAX_LOG_SIZE ]]; then
# "obs_type": "WIDE", for i in $(seq $((MAX_LOG_FILES-1)) -1 1); do
# "project_id": "none", [[ -f "${LOG_FILE}.$i" ]] && mv "${LOG_FILE}.$i" "${LOG_FILE}.$((i+1))"
# "obs_id": "10100543790", done
# "chip_id": "09", mv "$LOG_FILE" "${LOG_FILE}.1"
# "batch_id": "csst-msc-c9-25-sqdeg-v3-rdx1" touch "$LOG_FILE"
# } fi
}
extract_json_field() {
jq -r "$1" <<< "$2"
}
check_data_list() {
jq 'has("data_list")' <<< "$1"
}
# 20250621
# {
# "dag_group": "csst_dag.cli.msc_l1",
# "dag_group_run": "195244ff176f923aec9a9328c75ecaeb4a8c4345",
# "dag": "csst-msc-l1-mbi",
# "dag_run": "c89d7e7a022e6f0cdf1daff921c29dbce0ac7c01",
# "batch_id": "inttest",
# "priority": 1,
# "dataset": "csst-msc-c9-25sqdeg-v3",
# "obs_type": "WIDE",
# "obs_group": "W2",
# "obs_id": "10100232366",
# "detector": "09"
# }
while true while true
do do
# obsid=$(redis-cli -h ${REDIS_SERVER} -p ${REDIS_PORT} rpop single-image-reduction:obsid)
msg=$(redis-cli -h ${REDIS_SERVER} -p ${REDIS_PORT} -a ${REDIS_PWD} -n ${REDIS_DB} rpop csst_data_list) msg=$(redis-cli -h ${REDIS_SERVER} -p ${REDIS_PORT} -a ${REDIS_PWD} -n ${REDIS_DB} rpop csst_data_list)
#msg=$(redis-cli -h 192.168.25.205 -p 26379 -a 123456 rpop csst_data_list)
echo "msg : "$msg echo "msg : "$msg
if [ -z "$msg" ]; then if [ -z "$msg" ]; then
echo "msg is empty" code=102
code = 102
else else
# 判断是否存在 data_list 字段
has_data_list=$(check_data_list "$msg")
if [ "$has_data_list" = "true" ]; then
# 提取 data_list 字段的值
data_list=$(extract_json_field '.data_list' "$msg")
# 提取除了 data_list 之外的其他字段
msg=$(extract_json_field 'del(.data_list)' "$msg")
fi
echo "msg is not empty" $msg echo "msg is not empty" $msg
# 提取dag_id 对应app_id # 提取dag_id 对应app_id
dag_id=$(echo "$msg" | jq -r '.dag') dag_id=$(extract_json_field '.dag' "$msg")
echo "dag_id : "$dag_id
#检索dag_id与app_id对应关系的配置文件appconfig.txt #检索dag_id与app_id对应关系的配置文件appconfig.txt
app_id=$(awk -v key="$dag_id" '$1 == key {print $2}' /config/appconfig.txt) app_id=$(awk -v key="$dag_id" '$1 == key {print $2}' /config/appconfig.txt)
# 检查是否找到app_id # 检查是否找到app_id
if [ -n "$app_id" ]; then if [ -z "$app_id" ]; then
echo "The appid for $dag_id is $app_id" code=103
else sleep 0.1 # 增加等待时间
echo "$dag_id not found in appconfig.txt" continue
code = 103
fi fi
# 提取dag_run_id # 提取dag_run_id
dag_run_id=$(echo "$msg" | jq -r '.dag_run') dag_run_id=$(extract_json_field '.dag_run' "$msg")
echo "dag_run_id : "$dag_run_id
# message=$(echo "$msg" | jq -c '.message')
message=$(echo "$msg" | tr -d '[:space:]') message=$(echo "$msg" | tr -d '[:space:]')
# 提取message # 日志轮转
#echo "$message" >> "msgs.txt" rotate_logs
echo "$(date) massage :"$message >> "$LOG_FILE"
echo "$(date) massage :"$message >> "/logs/msgs.txt"
#send-message $obsid #send-message $obsid
job_name="message-router-csst" job_name="message-router-csst"
...@@ -76,10 +73,25 @@ do ...@@ -76,10 +73,25 @@ do
priority=$(expr $num - ${priority%%[^1-9]*}) priority=$(expr $num - ${priority%%[^1-9]*})
echo "priority : "$priority echo "priority : "$priority
scalebox task add --app-id ${app_id} --header sorted_tag=${priority} --header dag_run_id=${dag_run_id} --header repeatable=yes --upsert --sink-job ${job_name} ${message} if [ "$has_data_list" = "true" ]; then
strdatalist=$(echo "$data_list" | tr -d '[:space:]')
echo "has data_list :"$strdatalist
scalebox task add --app-id ${app_id} \
--header sorted_tag=${priority} \
--header dag_run_id=${dag_run_id} \
--header repeatable=yes \
--header data_list=${strdatalist} \
--upsert --sink-job ${job_name} ${message}
else
scalebox task add --app-id ${app_id} \
--header sorted_tag=${priority} \
--header dag_run_id=${dag_run_id} \
--header repeatable=yes \
--upsert --sink-job ${job_name} ${message}
fi
code=$? code=$?
fi fi
echo $code echo $code
sleep 0.001 sleep 0.01
done done
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment