#!/bin/bash # json串 # "{\"dag_id\": \"csst-msc-l1-mbi\", \"dag_run_id\": \"202411111505581036\", \"message\": {\"obsid\": \"11009101682009\", \"chipid\": \"24\"}}" # { # "priority": 1, # "dag_id": "csst-msc-l1-mbi", # "dag_run_id": "12345", # "dataset": "csst-msc-c9-25sqdeg-v3", # "obs_type": "WIDE", # "project_id": "none", # "obs_id": "10100543790", # "chip_id": "09", # "batch_id": "csst-msc-c9-25-sqdeg-v3-rdx1" # } # 20250621 # { # "dag_group": "csst_dag.cli.msc_l1", # "dag_group_run": "195244ff176f923aec9a9328c75ecaeb4a8c4345", # "dag": "csst-msc-l1-mbi", # "dag_run": "c89d7e7a022e6f0cdf1daff921c29dbce0ac7c01", # "batch_id": "inttest", # "priority": 1, # "dataset": "csst-msc-c9-25sqdeg-v3", # "obs_type": "WIDE", # "obs_group": "W2", # "obs_id": "10100232366", # "detector": "09" # } while true do # obsid=$(redis-cli -h ${REDIS_SERVER} -p ${REDIS_PORT} rpop single-image-reduction:obsid) msg=$(redis-cli -h ${REDIS_SERVER} -p ${REDIS_PORT} -a ${REDIS_PWD} -n ${REDIS_DB} rpop csst_data_list) echo "msg : "$msg if [ -z "$msg" ]; then echo "msg is empty" code = 102 else echo "msg is not empty" $msg # 提取dag_id 对应app_id dag_id=$(echo "$msg" | jq -r '.dag') echo "dag_id : "$dag_id #检索dag_id与app_id对应关系的配置文件appconfig.txt app_id=$(awk -v key="$dag_id" '$1 == key {print $2}' /config/appconfig.txt) # 检查是否找到app_id if [ -n "$app_id" ]; then echo "The appid for $dag_id is $app_id" else echo "$dag_id not found in appconfig.txt" code = 103 fi # 提取dag_run_id dag_run_id=$(echo "$msg" | jq -r '.dag_run') echo "dag_run_id : "$dag_run_id # message=$(echo "$msg" | jq -c '.message') message=$(echo "$msg" | tr -d '[:space:]') # 提取message #echo "$message" >> "msgs.txt" echo "$(date) massage :"$message >> "/logs/msgs.txt" #send-message $obsid job_name="message-router-csst" priority=$(echo "$msg" | jq -r '.priority') if [[ ! "$priority" =~ ^[1-9]$ ]]; then priority="1" fi num=10 priority=$(expr $num - ${priority%%[^1-9]*}) echo "priority : "$priority scalebox task add --app-id ${app_id} --header sort_tag=${priority} --header dag_run_id=${dag_run_id} --header repeatable=yes --upsert --sink-job ${job_name} ${message} code=$? fi echo $code sleep 0.001 done