#!/bin/bash # json串 # "{\"dag_id\": \"csst-msc-l1-mbi\", \"parameters\": {\"dag_run_id\": \"202411111505581036\", \"message\": {\"obsid\": \"11009101682009\", \"chipid\": \"24\"}}}" while true do # obsid=$(redis-cli -h ${REDIS_SERVER} -p ${REDIS_PORT} rpop single-image-reduction:obsid) msg=$(redis-cli -h ${REDIS_SERVER} -p ${REDIS_PORT} -a ${REDIS_PWD} rpop csst_data_list) # msg=$(redis-cli -h 10.3.10.28 -a 123456 lpop csst_data_list_zxl) #redis-cli -h 10.3.10.28 -p 26379 -a "csst__2025" rpop csst_data_list echo "msg : "$msg if [ -z "$msg" ]; then echo "msg is empty" code = 102 else echo "msg is not empty" $msg # 提取dag_id 对应app_id dag_id=$(echo "$msg" | jq -r '.dag_id') echo "dag_id : "$dag_id #检索dag_id与app_id对应关系的配置文件appconfig.txt app_id=$(awk -v key="$dag_id" '$1 == key {print $2}' /config/appconfig.txt) # 检查是否找到app_id if [ -n "$app_id" ]; then echo "The appid for $dag_id is $app_id" else echo "$dag_id not found in appconfig.txt" code = 103 fi # 提取dag_run_id dag_run_id=$(echo "$msg" | jq -r '.dag_run_id') echo "dag_run_id : "$dag_run_id message=$(echo "$msg" | jq -c '.message') # 提取message #echo "$message" >> "msgs.txt" echo "$(date) massgae :"$message >> "msgs.txt" #send-message $obsid job_name="message-router-csst" scalebox task add --app-id ${app_id} --header dag_run_id=${dag_run_id} --header repeatable=yes --upsert --sink-job ${job_name} ${message} code=$? fi echo $code done