#!/bin/bash #TRACE=yes 临时办法 rm -rf /work/* message=$1 echo "message: "$message headers=$(echo "$2" | sed 's/\\//g') echo "$headers" | jq '.' echo "headers: $headers" has_data_list=$(echo "$headers" | jq 'has("data_list")') if [ "$has_data_list" = "true" ]; then # 提取 data_list 字段的值 data_list=$(echo "$headers" | jq -r '.data_list') fixed_data_list=$(echo "$data_list" | sed 's/\([a-f0-9]\{24\}\)/"\1"/g') # 将 data_list 重新插入回 message message=$(echo "$message" | jq --argjson dl "$fixed_data_list" '. + {data_list: $dl}') fi cd /pipeline/output/ python /pipeline/app/run.py $message exit_code=$? echo "=====pipeline.log====" > /work/custom-out.txt cat /pipeline/output/pipeline.log >> /work/custom-out.txt echo "======module.log======" >> /work/custom-out.txt cat /pipeline/output/module.log|tail -n 100 >> /work/custom-out.txt timefile=/pipeline/output/timestamp.txt if test -f "$timefile"; then echo "$timefile exist" mv /pipeline/output/timestamp.txt /work/timestamps.txt fi headers=$2 pattern='"dag_run_id":"([^"]+)"' if [[ $headers =~ $pattern ]]; then dag_run_id="${BASH_REMATCH[1]}" echo "dag_run_id: $dag_run_id" else # no dag_run_id in json dag_run_id="" fi echo "dag_run_id:$dag_run_id" >> /work/extra-attributes.txt if [ $exit_code -eq 0 ]; then echo finish ast-sso. # scalebox task add $1 else echo "finish ast-sso, exit_code: $exit_code" fi rm -rf /pipeline/input/* /pipeline/output/* exit $exit_code