Commit dc6bdfaa authored by Zhang Xiaoli's avatar Zhang Xiaoli
Browse files

For scalebox version=2604170059, update the code. job > module

parent 5e54895c
IMAGE_NAME:=csst/message-router-mbi: IMAGE_NAME:=csst/message-router-mbi
build: build:
docker build --network=host -t $(IMAGE_NAME) . docker build --network=host -t $(IMAGE_NAME) .
......
...@@ -32,7 +32,7 @@ class messageRoute(): ...@@ -32,7 +32,7 @@ class messageRoute():
print(f"Invalid JSON format in headers: {e}, headers: {headersstr}") print(f"Invalid JSON format in headers: {e}, headers: {headersstr}")
return return
if headers =="null" or "from_job" not in headersstr: if headers =="null" or "from_module" not in headersstr:
print("received redis-cli message") print("received redis-cli message")
dag_run_id=headersstr["dag_run_id"] dag_run_id=headersstr["dag_run_id"]
sorted_tag=headersstr["sorted_tag"] sorted_tag=headersstr["sorted_tag"]
...@@ -41,13 +41,13 @@ class messageRoute(): ...@@ -41,13 +41,13 @@ class messageRoute():
else: else:
try: try:
# headersstr = json.loads(headers) # headersstr = json.loads(headers)
from_job=headersstr["from_job"] from_module=headersstr["from_module"]
#from_ip=headersstr["from_ip"] #from_ip=headersstr["from_ip"]
dag_run_id=headersstr["dag_run_id"] dag_run_id=headersstr["dag_run_id"]
sorted_tag=headersstr["sorted_tag"] sorted_tag=headersstr["sorted_tag"]
data_list = headersstr.get("data_list") data_list = headersstr.get("data_list")
print("from_job :"+from_job+" dag_run_id :"+dag_run_id+" sorted_tag :"+sorted_tag+"data_list :"+data_list) print("from_module :"+from_module+" dag_run_id :"+dag_run_id+" sorted_tag :"+sorted_tag+"data_list :"+data_list)
messageRoute.sendsinkjobs(dagfile,sorted_tag,dag_run_id,data_list,from_job,message) messageRoute.sendsinkmodules(dagfile,sorted_tag,dag_run_id,data_list,from_module,message)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
print("Invalid JSON format in headers:", e) print("Invalid JSON format in headers:", e)
...@@ -60,32 +60,32 @@ class messageRoute(): ...@@ -60,32 +60,32 @@ class messageRoute():
for task in tasks: for task in tasks:
if 'dependencies' not in task: if 'dependencies' not in task:
print(f"任务 '{task['name']}' 没有 dependencies 字段。") print(f"任务 '{task['name']}' 没有 dependencies 字段。")
sink_job=task['image'] sink_module=task['image']
print("The header job is "+sink_job) print("The header module is "+sink_module)
messageRoute.sendmsg(sorted_tag,dag_run_id,data_list,sink_job,message) messageRoute.sendmsg(sorted_tag,dag_run_id,data_list,sink_module,message)
@classmethod @classmethod
def sendsinkjobs(self,dagfile,sorted_tag,dag_run_id,data_list,from_job,message): def sendsinkmodules(self,dagfile,sorted_tag,dag_run_id,data_list,from_module,message):
#解析对应的DAG文件 #解析对应的DAG文件
#from_job='csst-msc-l1-mbi' #from_module='csst-msc-l1-mbi'
with open(dagfile, "r", encoding='utf-8') as f: with open(dagfile, "r", encoding='utf-8') as f:
data = yaml.safe_load(f) data = yaml.safe_load(f)
tasks = data.get('tasks', []) tasks = data.get('tasks', [])
# 先找到 from_job 对应的 image # 先找到 from_module 对应的 image
for task in tasks: for task in tasks:
if task.get('image') == from_job: if task.get('image') == from_module:
from_job = task.get('name') from_module = task.get('name')
print("The header job is "+from_job) print("The header module is "+from_module)
break break
for task in tasks: for task in tasks:
dependencies = task.get('dependencies', []) dependencies = task.get('dependencies', [])
if from_job in dependencies: if from_module in dependencies:
sink_job = task.get('image') sink_module = task.get('image')
if sink_job: if sink_module:
messageRoute.sendmsg(sorted_tag,dag_run_id,data_list,sink_job,message) messageRoute.sendmsg(sorted_tag,dag_run_id,data_list,sink_module,message)
@classmethod @classmethod
def sendmsg(self,sorted_tag,dag_run_id,data_list,job,message): def sendmsg(self,sorted_tag,dag_run_id,data_list,module,message):
#执行解包操作 #执行解包操作
print("sendmsg dag_run_id is "+dag_run_id) print("sendmsg dag_run_id is "+dag_run_id)
messageRoute.append_dagrunid(dag_run_id) messageRoute.append_dagrunid(dag_run_id)
...@@ -94,18 +94,18 @@ class messageRoute(): ...@@ -94,18 +94,18 @@ class messageRoute():
if not data_list: if not data_list:
print("data_list 是 None 或空值") print("data_list 是 None 或空值")
#header中没有data_list参数时,下发消息也不带--header data_list #header中没有data_list参数时,下发消息也不带--header data_list
command = f"scalebox task add --header sorted_tag={sorted_tag} --header dag_run_id={dag_run_id} --header repeatable=yes --upsert --sink-job={job} {message}" command = f"scalebox task add --header sorted_tag={sorted_tag} --header dag_run_id={dag_run_id} --header repeatable=yes --conflict-action=OVERWRITE --sink-module={module} {message}"
else: else:
print("data_list 有值") print("data_list 有值")
# data_list 去掉空格 # data_list 去掉空格
str_data_list=str(data_list) str_data_list=str(data_list)
cleaned_data_list = str_data_list.replace(' ', '') cleaned_data_list = str_data_list.replace(' ', '')
command = f"scalebox task add --header sorted_tag={sorted_tag} --header dag_run_id={dag_run_id} --header repeatable=yes --header data_list={cleaned_data_list} --upsert --sink-job={job} {message}" command = f"scalebox task add --header sorted_tag={sorted_tag} --header dag_run_id={dag_run_id} --header repeatable=yes --header data_list={cleaned_data_list} --conflict-action=OVERWRITE --sink-module={module} {message}"
print("command : "+command) print("command : "+command)
result=subprocess.run(command, shell=True) result=subprocess.run(command, shell=True)
if result.returncode == 0: if result.returncode == 0:
print(f"send message {message} to {job}") print(f"send message {message} to {module}")
# dag_run_id记入/work/extra-attributes.txt # dag_run_id记入/work/extra-attributes.txt
print("命令执行成功") print("命令执行成功")
return result.returncode return result.returncode
......
...@@ -9,8 +9,12 @@ dist: ...@@ -9,8 +9,12 @@ dist:
push: push:
docker push $(IMAGE_NAME) docker push $(IMAGE_NAME)
run-bash: run-bash:
docker run -it --entrypoint bash -e PGHOST=192.168.25.205:5432 -e GRPC_SERVER=192.168.25.205 -v ${PWD}/config:/config $(IMAGE_NAME) docker run -it --entrypoint bash -e PGHOST=192.168.25.18:5432 -e GRPC_SERVER=192.168.25.18 -v ${PWD}/config:/config $(IMAGE_NAME)
run: run:
docker run -d -e PGHOST=192.168.25.205:5432 -e GRPC_SERVER=192.168.25.205 -v ${PWD}/config:/config -v /nfs/scalebox/redis-cli:/logs $(IMAGE_NAME) docker run -d \
--log-driver json-file \
--log-opt max-size=2000m \
--log-opt max-file=5 \
-e PGHOST=192.168.25.18:5432 -e GRPC_SERVER=192.168.25.18 -v ${PWD}/config:/config -v /nfs/scalebox/sc3/redis-cli:/logs $(IMAGE_NAME)
down: down:
docker stop $(IMAGE_NAME) docker stop $(IMAGE_NAME)
...@@ -43,7 +43,7 @@ do ...@@ -43,7 +43,7 @@ do
msg=$(extract_json_field 'del(.data_list)' "$msg") msg=$(extract_json_field 'del(.data_list)' "$msg")
fi fi
echo "msg is not empty" $msg # echo "msg is not empty" $msg
# 提取dag_id 对应app_id # 提取dag_id 对应app_id
dag_id=$(extract_json_field '.dag' "$msg") dag_id=$(extract_json_field '.dag' "$msg")
#检索dag_id与app_id对应关系的配置文件appconfig.txt #检索dag_id与app_id对应关系的配置文件appconfig.txt
...@@ -62,9 +62,9 @@ do ...@@ -62,9 +62,9 @@ do
# 日志轮转 # 日志轮转
rotate_logs rotate_logs
echo "$(date) massage :"$allmsg >> "$LOG_FILE" echo "$(date) message :"$allmsg >> "$LOG_FILE"
#send-message $obsid #send-message $obsid
job_name="message-router-csst" module_name="message-router-csst"
priority=$(echo "$msg" | jq -r '.priority') priority=$(echo "$msg" | jq -r '.priority')
if [[ ! "$priority" =~ ^[1-9]$ ]]; then if [[ ! "$priority" =~ ^[1-9]$ ]]; then
...@@ -75,20 +75,19 @@ do ...@@ -75,20 +75,19 @@ do
echo "priority : "$priority echo "priority : "$priority
if [ "$has_data_list" = "true" ]; then if [ "$has_data_list" = "true" ]; then
strdatalist=$(echo "$data_list" | tr -d '[:space:]') data_list=$(echo "$data_list" | tr -d '[:space:]')
strdatalist=$(echo "$data_list" | sed 's/"/\\"/g')
echo "has data_list :"$strdatalist echo "has data_list :"$strdatalist
scalebox task add --app-id ${app_id} \ strheaders="{\"sorted_tag\":\"${priority}\",\"dag_run_id\":\"${dag_run_id}\",\"repeatable\":\"yes\",\"data_list\":\"${strdatalist}\"}"
--header sorted_tag=${priority} \ echo "strheaders: "$strheaders
--header dag_run_id=${dag_run_id} \ scalebox task add --app-id ${app_id} --headers "${strheaders}" --conflict-action=OVERWRITE --sink-module ${module_name} ${message}
--header repeatable=yes \
--header data_list=${strdatalist} \
--upsert --sink-job ${job_name} ${message}
else else
scalebox task add --app-id ${app_id} \ scalebox task add --app-id ${app_id} \
--header sorted_tag=${priority} \ --header sorted_tag=${priority} \
--header dag_run_id=${dag_run_id} \ --header dag_run_id=${dag_run_id} \
--header repeatable=yes \ --header repeatable=yes \
--upsert --sink-job ${job_name} ${message} --conflict-action=OVERWRITE --sink-module ${module_name} ${message}
fi fi
code=$? code=$?
......
...@@ -16,4 +16,6 @@ ...@@ -16,4 +16,6 @@
# redis-cli -h 192.168.25.205 -p 26379 -a 123456 rpush csst_data_list '{"dag_group":"default","dag_group_run":"f5b2f3d3cb64ec53a3f26c9c7502d255ff95439e","batch_id":"csci-1110-gn06-ylt01-130pmap","dag":"csst-msc-l1-mbi","dag_run":"2cf1a537395cb25b7c8d084f4b5560796ca21395","dataset":"csst-msc-c11-csci-ptc-v1","instrument":"MSC","obs_type":"WIDE","obs_group":"ksc-pzcs-sj1-gn06-ylt01","obs_id":"10100543249","detector":"18","filter":"","object":"","proposal_id":"","custom_id":"","priority":1,"data_list":["6904545f537c6b2ec572e504"],"extra_kwargs":{},"rerun":-1,"status_code":-1024,"queue_time":"2025-11-10T12:05:49.276827","pmapname":"csst_000130.pmap","ref_cat":"","created_time":"2025-11-10T04:05:49.188","n_file_expected":1,"n_file_found":1}' # redis-cli -h 192.168.25.205 -p 26379 -a 123456 rpush csst_data_list '{"dag_group":"default","dag_group_run":"f5b2f3d3cb64ec53a3f26c9c7502d255ff95439e","batch_id":"csci-1110-gn06-ylt01-130pmap","dag":"csst-msc-l1-mbi","dag_run":"2cf1a537395cb25b7c8d084f4b5560796ca21395","dataset":"csst-msc-c11-csci-ptc-v1","instrument":"MSC","obs_type":"WIDE","obs_group":"ksc-pzcs-sj1-gn06-ylt01","obs_id":"10100543249","detector":"18","filter":"","object":"","proposal_id":"","custom_id":"","priority":1,"data_list":["6904545f537c6b2ec572e504"],"extra_kwargs":{},"rerun":-1,"status_code":-1024,"queue_time":"2025-11-10T12:05:49.276827","pmapname":"csst_000130.pmap","ref_cat":"","created_time":"2025-11-10T04:05:49.188","n_file_expected":1,"n_file_found":1}'
redis-cli -h 192.168.25.205 -p 26379 -a 123456 rpush csst_data_list "{\"dag_group\":\"default\",\"dag_group_run\":\"b449ea3ba240ad42052aae7f3e3a0deb5772ef0d\",\"batch_id\":\"csci-test-hstdm-stare-001\",\"dag\":\"csst-hstdm-l1\",\"dag_run\":\"b138a18ba5a5c123aeff8f695ec378cfed571603\",\"dataset\":\"csst-hstdm-c11-stare-sis1-v1\",\"instrument\":\"HSTDM\",\"obs_type\":\"STARE\",\"obs_group\":\"default\",\"obs_id\":\"\",\"detector\":\"\",\"filter\":\"\",\"object\":\"\",\"proposal_id\":\"\",\"custom_id\":\"\",\"priority\":1,\"data_list\":[\"687b06060545d4e87daa80d3\",\"687b060a016213e95e57e2db\",\"687b060308ab2531b7424afa\"],\"extra_kwargs\":{},\"rerun\":-1,\"status_code\":-1024,\"queue_time\":\"2025-10-30T15:52:44.879820\",\"pmapname\":\"\",\"ref_cat\":\"\",\"created_time\":\"2025-10-30T07:52:44.616\",\"n_file_expected\":516,\"n_file_found\":516}" #redis-cli -h 192.168.25.205 -p 26379 -a 123456 rpush csst_data_list "{\"dag_group\":\"default\",\"dag_group_run\":\"b449ea3ba240ad42052aae7f3e3a0deb5772ef0d\",\"batch_id\":\"csci-test-hstdm-stare-001\",\"dag\":\"csst-hstdm-l1\",\"dag_run\":\"b138a18ba5a5c123aeff8f695ec378cfed571603\",\"dataset\":\"csst-hstdm-c11-stare-sis1-v1\",\"instrument\":\"HSTDM\",\"obs_type\":\"STARE\",\"obs_group\":\"default\",\"obs_id\":\"\",\"detector\":\"\",\"filter\":\"\",\"object\":\"\",\"proposal_id\":\"\",\"custom_id\":\"\",\"priority\":1,\"data_list\":[\"687b06060545d4e87daa80d3\",\"687b060a016213e95e57e2db\",\"687b060308ab2531b7424afa\"],\"extra_kwargs\":{},\"rerun\":-1,\"status_code\":-1024,\"queue_time\":\"2025-10-30T15:52:44.879820\",\"pmapname\":\"\",\"ref_cat\":\"\",\"created_time\":\"2025-10-30T07:52:44.616\",\"n_file_expected\":516,\"n_file_found\":516}"
\ No newline at end of file
redis-cli -h 192.168.25.18 -p 26379 -a csst__2025 rpush csst_data_list "{"dag_group":"default","dag_group_run":"d514c894c9a966666ad677969f416290ae5c1f6d","batch_id":"esc1-test-20260212","dag":"csst-cpic-l1","dag_run":"9cb8eeb0c4e8822e22cbda63aadcd00ae136c5be","dataset":"csst-cpic-c11-ppltest-v1","instrument":"CPIC","obs_type":"SCI","obs_group":"hip71681","obs_id":"40100003705","detector":"VIS","filter":"","object":"","proposal_id":"","custom_id":"","priority":1,"extra_kwargs":{},"rerun":0,"status_code":-1024,"queue_time":"2026-02-12T14:12:09.854312","create_time":"2026-02-12T14:12:09.854312","pmapname":"","ref_cat":"","created_time":"2026-02-12T06:12:09.756","n_file_expected":1,"n_file_found":1}"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment