diff --git a/src/actuator/actuatotFeatures.go b/src/actuator/actuatotFeatures.go new file mode 100644 index 0000000000000000000000000000000000000000..3609333701f0f43bb9882e0ab07461e46f2a3002 --- /dev/null +++ b/src/actuator/actuatotFeatures.go @@ -0,0 +1,149 @@ +package actuator + +import ( + "context" + "github.com/golang/protobuf/ptypes/empty" + wrps "github.com/golang/protobuf/ptypes/wrappers" + pb "goagent/scalebox" + "google.golang.org/grpc" + "log" + "os" +) + + +var address =os.Getenv("grpc_server") +//todo +var logger,_=makeLog("./") + + +func GetRunnableSlotList() ([]*pb.Command,error) { + //test + address =os.Getenv("grpc_server") + + //connecting + conn, err := grpc.Dial(address, grpc.WithInsecure()) + if err != nil { + log.Printf("Connection error:%v", err) + logger.Printf("Connection error:%v", err) + return nil,err + } + defer conn.Close() + c := pb.NewBoxServiceClient(conn) + + res,err:=c.GetRunnableSlotList(context.Background(),&empty.Empty{}) + if err!=nil{ + log.Printf("getRunnableSlotList grpc error:%v",err) + logger.Printf("getRunnableSlotList grpc error:%v",err) + } + return res.Command,nil + +} + +func GetRunnableDaemonList() ([]*pb.Command,error) { + //connecting + conn, err := grpc.Dial(address, grpc.WithInsecure()) + if err != nil { + log.Printf("Connection error:%v", err) + logger.Printf("Connection error:%v", err) + return nil,err + } + defer conn.Close() + c := pb.NewBoxServiceClient(conn) + + res,err:=c.GetRunnableDaemonList(context.Background(),&empty.Empty{}) + if err!=nil{ + log.Printf("getRunnableDaemonList grpc error:%v",err) + logger.Printf("getRunnableDaemonList grpc error:%v",err) + } + return res.Command,nil +} + +func GetTerminatedDaemonList() ([]*pb.Command,error) { + //connecting + conn, err := grpc.Dial(address, grpc.WithInsecure()) + if err != nil { + log.Printf("Connection error:%v", err) + logger.Printf("Connection error:%v", err) + return nil,err + } + defer conn.Close() + c := pb.NewBoxServiceClient(conn) + + res,err:=c.GetTerminableDaemonList(context.Background(),&empty.Empty{}) + if err!=nil{ + log.Printf("getTerminatedDaemonList grpc error:%v",err) + logger.Printf("getTerminatedDaemonList grpc error:%v",err) + } + return res.Command,nil +} + + + + + + +func SetSlotInitialized(cmdPkg *pb.Command)error { + + //connecting + conn, err := grpc.Dial(address, grpc.WithInsecure()) + if err != nil { + log.Printf("Connection error:%v", err) + logger.Printf("Connection error:%v", err) + return err + } + defer conn.Close() + c := pb.NewBoxServiceClient(conn) + retCode,err:=c.SetSlotInitialized(context.Background(),&wrps.Int32Value{Value: cmdPkg.Id}) + if err!=nil{ + log.Printf("setSlotInit grpc error:%v",err) + logger.Printf("setSlotInit grpc error:%v",err) + return err + } + logger.Printf("recode:%v",retCode) + return nil + } + + + +func SetRunnableDaemonInitialized(cmdPkg *pb.Command)error { + //connecting + conn, err := grpc.Dial(address, grpc.WithInsecure()) + if err != nil { + log.Printf("Connection error:%v", err) + logger.Printf("Connection error:%v", err) + return err + } + defer conn.Close() + c := pb.NewBoxServiceClient(conn) + retCode,err:=c.SetDaemonInitialized(context.Background(),&wrps.Int32Value{Value: cmdPkg.Id}) + if err!=nil{ + log.Printf("setDaemonInit grpc error:%v",err) + logger.Printf("setDaemonInit grpc error:%v",err) + return err + } + logger.Printf("recode:%v",retCode) + return nil +} + +func SetRunnableDaemonTerminated(cmdPkg *pb.Command)error{ + //connecting + conn, err := grpc.Dial(address, grpc.WithInsecure()) + if err != nil { + log.Printf("Connection error:%v", err) + logger.Printf("Connection error:%v", err) + return err + } + defer conn.Close() + c := pb.NewBoxServiceClient(conn) + retCode,err:=c.SetDaemonTerminated(context.Background(),&wrps.Int32Value{Value: cmdPkg.Id}) + if err!=nil{ + log.Printf("setDaemonTerminated grpc error:%v",err) + logger.Printf("setDaemonTerminated grpc error:%v",err) + return err + } + logger.Printf("recode:%v",retCode) + return nil + +} + + diff --git a/src/actuator/backup.txt b/src/actuator/backup.txt new file mode 100644 index 0000000000000000000000000000000000000000..7c13c2d9b531aa6809680e07e612ac60c3c3cad7 --- /dev/null +++ b/src/actuator/backup.txt @@ -0,0 +1,80 @@ +func GetRunnableSlotList() ([]*pb.Command,error) { + //connecting + conn, err := grpc.Dial(address, grpc.WithInsecure()) + if err != nil { + log.Printf("Connection error:%v", err) + logger.Printf("Connection error:%v", err) + return nil,err + } + defer conn.Close() + c := pb.NewBoxServiceClient(conn) + + res,err:=c.GetRunnableSlotList(context.Background(),&empty.Empty{}) + if err!=nil{ + log.Printf("getRunnableSlotList grpc error:%v",err) + logger.Printf("getRunnableSlotList grpc error:%v",err) + } + return res.Command,nil + +} + +func GetRunnableDaemonList() ([]*pb.Command,error) { + //connecting + conn, err := grpc.Dial(address, grpc.WithInsecure()) + if err != nil { + log.Printf("Connection error:%v", err) + logger.Printf("Connection error:%v", err) + return nil,err + } + defer conn.Close() + c := pb.NewBoxServiceClient(conn) + + res,err:=c.GetRunnableDaemonList(context.Background(),&empty.Empty{}) + if err!=nil{ + log.Printf("getRunnableDaemonList grpc error:%v",err) + logger.Printf("getRunnableDaemonList grpc error:%v",err) + } + return res.Command,nil +} + + +func GetList(tag string)([]*pb.Command,error) { + //for testing + address=os.Getenv("grpc_server") + //connecting + conn, err := grpc.Dial(address, grpc.WithInsecure()) + if err != nil { + log.Printf("Connection error:%v", err) + logger.Printf("Connection error:%v", err) + return nil,err + } + defer conn.Close() + c := pb.NewBoxServiceClient(conn) + + + switch tag { + case "runnableSlot": + res,err:=c.GetRunnableSlotList(context.Background(),&empty.Empty{}) + if err!=nil{ + log.Printf("getRunnableSlotList grpc error:%v",err) + logger.Printf("getRunnableSlotList grpc error:%v",err) + } + return res.Command,nil + case "runnableDaemon": + res,err:=c.GetRunnableDaemonList(context.Background(),&empty.Empty{}) + if err!=nil{ + log.Printf("getRunnableDaemonList grpc error:%v",err) + logger.Printf("getRunnableDaemonList grpc error:%v",err) + } + return res.Command,nil + case "terminatedDaemon": + res,err:=c.GetTerminatedDaemonList(context.Background(),&empty.Empty{}) + if err!=nil{ + log.Printf("getTerminatedDaemonList grpc error:%v",err) + logger.Printf("getTerminatedDaemonList grpc error:%v",err) + } + return res.Command,nil + } + return nil, errors.New("no such a tag") + +} \ No newline at end of file diff --git a/src/actuator/bussinessLogic.go b/src/actuator/bussinessLogic.go new file mode 100644 index 0000000000000000000000000000000000000000..edc8f567b7f11b3ed365d498a462d3f9c24f67f7 --- /dev/null +++ b/src/actuator/bussinessLogic.go @@ -0,0 +1,21 @@ +package actuator + +import pb "goagent/scalebox" + +/* +@Time : 2021/10/25 下午2:57 +@Author : Zhouyc +*/ + +func Instantiate(cmdPkg *pb.Command)error { + var factory DockerLauncherCreator + //todo with slurm + factory=new(ViaSSHCreator) + launcher:=factory.Create() + err:=launcher.launchD(cmdPkg.Host,cmdPkg.CommandText) + if err!=nil{ + return err + } + return nil + +} \ No newline at end of file diff --git a/src/actuator/dockerfactory.go b/src/actuator/dockerfactory.go new file mode 100644 index 0000000000000000000000000000000000000000..7a47b3e81f0c641e9340a6af55c015e279cccdd7 --- /dev/null +++ b/src/actuator/dockerfactory.go @@ -0,0 +1,43 @@ +package actuator + +import ( + "log" +) + +/* +@Time : 2021/10/21 下午3:19 +@Author : Zhouyc +*/ + +type DockerLauncher interface { + launchD(host string,cmd string) error +} +type DockerLauncherCreator interface { + Create() DockerLauncher +} + +type ViaSSH struct { +} +func (s *ViaSSH)launchD(host string,cmd string)error{ + cmdline:="ssh"+" "+host+" "+"\""+cmd+"\"" + timeout:=60 + + res,err:=launch(cmdline,timeout) + if err!=nil{ + log.Printf("ssh error:%v",err) + logger.Printf("ssh error:%v",err) + return err + } + log.Printf(res) + logger.Printf(res) + return nil +} + +type ViaSSHCreator struct { +} +func (sc *ViaSSHCreator)Create()DockerLauncher{ + l:=new(ViaSSH) + return l + +} + diff --git a/src/actuator/launch_test.go b/src/actuator/launch_test.go new file mode 100644 index 0000000000000000000000000000000000000000..d958935f29638b026301afb0cb4031828f41a0a9 --- /dev/null +++ b/src/actuator/launch_test.go @@ -0,0 +1,32 @@ +package actuator + +/* +@Time : 2021/10/25 下午7:07 +@Author : Zhouyc +*/ +import ( + "log" + "testing" +) + +func TestLaunch(t *testing.T) { + timeout:=5 + out,err:=launch("../shells/3s.sh",timeout) + if err!=nil{ + t.Error(err) + } + log.Print(out) + + out,err=launch("../shells/10s.sh",timeout) + if err!=nil { + if err.Error() != "sh script timeout" { + t.Error(err) + } + } + log.Print(out) + + + + + +} \ No newline at end of file diff --git a/src/actuator/launchshell.go b/src/actuator/launchshell.go new file mode 100644 index 0000000000000000000000000000000000000000..57a139547de72020c05e1407e363878687ba03c0 --- /dev/null +++ b/src/actuator/launchshell.go @@ -0,0 +1,70 @@ +package actuator + +/* +@Time : 2021/10/21 上午10:49 +@Author : Zhouyc +*/ +import ( + "bytes" + "errors" + "log" + "os/exec" +) +import "time" +//import "errors" + +func launch(command string,timeout int)(string,error) { + var shellErr=make(chan error) + var done=make(chan string) + + cmd:=exec.Command("/usr/bin/bash",command) + var stdout,stderr bytes.Buffer + cmd.Stdout=&stdout + cmd.Stderr=&stderr + + //launch + go func() { + err:=cmd.Run() + if err!=nil{ + shellErr <- err + log.Printf("Unable to execute shells:%v",err) + return + } + log.Printf("Shells ran successfully") + done<-stdout.String()+stderr.String() + + }() + + + // timeout==-1 means no need for timeout function + if timeout!=-1 { + select { + case <-time.After(time.Second * time.Duration(timeout)): + log.Printf("Shell scripts timeout") + err := cmd.Process.Kill() + if err!=nil{ + return "",err + } + return "", errors.New("sh script timeout") + case result := <-done: + log.Printf("stdout&stderr:%s", result) + return result, nil + case err := <-shellErr: + log.Printf("sh script error:%v", err) + return "", err + + } + } + select { + case result := <-done: + log.Printf("stdout&stderr:%s", result) + return result, nil + case err := <-shellErr: + log.Printf("error occured while launching:%v", err) + return "shell running error", err + + } + + + +} diff --git a/src/actuator/logger.go b/src/actuator/logger.go new file mode 100644 index 0000000000000000000000000000000000000000..1793f2bdb299357317fe561e082577193230ff2a --- /dev/null +++ b/src/actuator/logger.go @@ -0,0 +1,21 @@ +package actuator + +/* +@Time : 2021/10/21 上午11:19 +@Author : Zhouyc +*/ +import "os" +import "log" +import "time" + +func makeLog(path string) (*log.Logger ,error) { + logFile,err:=os.Create(path+time.Now().Format("2006-01-02")+".log") + if err!=nil{ + log.Printf("Cannot create logFile:%v",err) + return nil,err + } + logger:=log.New(logFile,"",log.Ldate|log.Ltime|log.Lshortfile) + return logger,nil + + +} diff --git a/src/actuator/main.go b/src/actuator/main.go new file mode 100644 index 0000000000000000000000000000000000000000..0d591b63c401b67ea56996f09568f99fa9586969 --- /dev/null +++ b/src/actuator/main.go @@ -0,0 +1,95 @@ +package actuator + +import ( + "log" + "time" +) + +/* +@Time : 2021/10/21 下午5:11 +@Author : Zhouyc +*/ + +func main() { + for { + + commandList, err :=GetRunnableSlotList() + if err != nil { + log.Printf("getRunnableSlotList error:%v", err) + logger.Printf("getRunnableSlotList error:%v", err) + }else { + + if len(commandList) != 0 { + for _, cmdPkg := range commandList { + cmdPkg := cmdPkg + go func() { + err:=Instantiate(cmdPkg) + if err!=nil{ + logger.Printf("Launch container failure:%v",err) + return + } + err = SetSlotInitialized(cmdPkg) + if err != nil { + logger.Printf("setSlotInit grpc error:%v", err) + return + } + }() + } + } + } + + commandList,err=GetRunnableDaemonList() + if err != nil { + log.Printf("getRunnableDaemonList error:%v", err) + logger.Printf("getRunnableDaemonList error:%v", err) + }else { + + if len(commandList) != 0 { + for _, cmdPkg := range commandList { + cmdPkg := cmdPkg + go func() { + err:=Instantiate(cmdPkg) + if err!=nil{ + logger.Printf("Launch container failure:%v",err) + return + } + err = SetRunnableDaemonInitialized(cmdPkg) + if err != nil { + logger.Printf("setDaemonInit grpc error:%v", err) + return + } + }() + } + } + } + + commandList,err=GetTerminatedDaemonList() + if err != nil { + log.Printf("getTerminatedDaemonList error:%v", err) + logger.Fatalf("getTerminatedDaemonList error:%v", err) + }else { + + if len(commandList) != 0 { + for _, cmdPkg := range commandList { + cmdPkg := cmdPkg + go func() { + err:=Instantiate(cmdPkg) + if err!=nil{ + logger.Printf("Launch container failure:%v",err) + return + } + err = SetRunnableDaemonTerminated(cmdPkg) + if err != nil { + logger.Printf("setDaemonTerminated grpc error:%v", err) + return + } + }() + } + } + } + + + time.Sleep(time.Second*120) + } + +}