#!/bin/bash # 设置日志轮转 LOG_FILE="/logs/msgs.txt" MAX_LOG_SIZE=$((100 * 1024 * 1024)) # 100MB MAX_LOG_FILES=5 rotate_logs() { if [[ -f "$LOG_FILE" && $(stat -f%z "$LOG_FILE" 2>/dev/null || stat -c%s "$LOG_FILE" 2>/dev/null) -gt $MAX_LOG_SIZE ]]; then for i in $(seq $((MAX_LOG_FILES-1)) -1 1); do [[ -f "${LOG_FILE}.$i" ]] && mv "${LOG_FILE}.$i" "${LOG_FILE}.$((i+1))" done mv "$LOG_FILE" "${LOG_FILE}.1" touch "$LOG_FILE" fi } extract_json_field() { jq -r "$1" <<< "$2" } check_data_list() { jq 'has("data_list")' <<< "$1" } while true do msg=$(redis-cli -h ${REDIS_SERVER} -p ${REDIS_PORT} -a ${REDIS_PWD} -n ${REDIS_DB} rpop csst_data_list) #msg=$(redis-cli -h 192.168.25.205 -p 26379 -a 123456 rpop csst_data_list) echo "msg : "$msg if [ -z "$msg" ]; then code=102 else # 判断是否存在 data_list 字段 has_data_list=$(check_data_list "$msg") if [ "$has_data_list" = "true" ]; then # 提取 data_list 字段的值 data_list=$(extract_json_field '.data_list' "$msg") # 提取除了 data_list 之外的其他字段 msg=$(extract_json_field 'del(.data_list)' "$msg") fi echo "msg is not empty" $msg # 提取dag_id 对应app_id dag_id=$(extract_json_field '.dag' "$msg") #检索dag_id与app_id对应关系的配置文件appconfig.txt app_id=$(awk -v key="$dag_id" '$1 == key {print $2}' /config/appconfig.txt) # 检查是否找到app_id if [ -z "$app_id" ]; then code=103 sleep 0.1 # 增加等待时间 continue fi # 提取dag_run_id dag_run_id=$(extract_json_field '.dag_run' "$msg") message=$(echo "$msg" | tr -d '[:space:]') # 日志轮转 rotate_logs echo "$(date) massage :"$message >> "$LOG_FILE" #send-message $obsid job_name="message-router-csst" priority=$(echo "$msg" | jq -r '.priority') if [[ ! "$priority" =~ ^[1-9]$ ]]; then priority="1" fi num=10 priority=$(expr $num - ${priority%%[^1-9]*}) echo "priority : "$priority if [ "$has_data_list" = "true" ]; then strdatalist=$(echo "$data_list" | tr -d '[:space:]') echo "has data_list :"$strdatalist scalebox task add --app-id ${app_id} \ --header sorted_tag=${priority} \ --header dag_run_id=${dag_run_id} \ --header repeatable=yes \ --header data_list=${strdatalist} \ --upsert --sink-job ${job_name} ${message} else scalebox task add --app-id ${app_id} \ --header sorted_tag=${priority} \ --header dag_run_id=${dag_run_id} \ --header repeatable=yes \ --upsert --sink-job ${job_name} ${message} fi code=$? fi echo $code sleep 0.01 done