Commit 586959db authored by Zhou Yaochen's avatar Zhou Yaochen
Browse files

init

parent 71fa7875
Pipeline #10 canceled with stages
package actuator
import (
"context"
"github.com/golang/protobuf/ptypes/empty"
wrps "github.com/golang/protobuf/ptypes/wrappers"
pb "goagent/scalebox"
"google.golang.org/grpc"
"log"
"os"
)
var address =os.Getenv("grpc_server")
//todo
var logger,_=makeLog("./")
func GetRunnableSlotList() ([]*pb.Command,error) {
//test
address =os.Getenv("grpc_server")
//connecting
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Printf("Connection error:%v", err)
logger.Printf("Connection error:%v", err)
return nil,err
}
defer conn.Close()
c := pb.NewBoxServiceClient(conn)
res,err:=c.GetRunnableSlotList(context.Background(),&empty.Empty{})
if err!=nil{
log.Printf("getRunnableSlotList grpc error:%v",err)
logger.Printf("getRunnableSlotList grpc error:%v",err)
}
return res.Command,nil
}
func GetRunnableDaemonList() ([]*pb.Command,error) {
//connecting
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Printf("Connection error:%v", err)
logger.Printf("Connection error:%v", err)
return nil,err
}
defer conn.Close()
c := pb.NewBoxServiceClient(conn)
res,err:=c.GetRunnableDaemonList(context.Background(),&empty.Empty{})
if err!=nil{
log.Printf("getRunnableDaemonList grpc error:%v",err)
logger.Printf("getRunnableDaemonList grpc error:%v",err)
}
return res.Command,nil
}
func GetTerminatedDaemonList() ([]*pb.Command,error) {
//connecting
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Printf("Connection error:%v", err)
logger.Printf("Connection error:%v", err)
return nil,err
}
defer conn.Close()
c := pb.NewBoxServiceClient(conn)
res,err:=c.GetTerminableDaemonList(context.Background(),&empty.Empty{})
if err!=nil{
log.Printf("getTerminatedDaemonList grpc error:%v",err)
logger.Printf("getTerminatedDaemonList grpc error:%v",err)
}
return res.Command,nil
}
func SetSlotInitialized(cmdPkg *pb.Command)error {
//connecting
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Printf("Connection error:%v", err)
logger.Printf("Connection error:%v", err)
return err
}
defer conn.Close()
c := pb.NewBoxServiceClient(conn)
retCode,err:=c.SetSlotInitialized(context.Background(),&wrps.Int32Value{Value: cmdPkg.Id})
if err!=nil{
log.Printf("setSlotInit grpc error:%v",err)
logger.Printf("setSlotInit grpc error:%v",err)
return err
}
logger.Printf("recode:%v",retCode)
return nil
}
func SetRunnableDaemonInitialized(cmdPkg *pb.Command)error {
//connecting
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Printf("Connection error:%v", err)
logger.Printf("Connection error:%v", err)
return err
}
defer conn.Close()
c := pb.NewBoxServiceClient(conn)
retCode,err:=c.SetDaemonInitialized(context.Background(),&wrps.Int32Value{Value: cmdPkg.Id})
if err!=nil{
log.Printf("setDaemonInit grpc error:%v",err)
logger.Printf("setDaemonInit grpc error:%v",err)
return err
}
logger.Printf("recode:%v",retCode)
return nil
}
func SetRunnableDaemonTerminated(cmdPkg *pb.Command)error{
//connecting
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Printf("Connection error:%v", err)
logger.Printf("Connection error:%v", err)
return err
}
defer conn.Close()
c := pb.NewBoxServiceClient(conn)
retCode,err:=c.SetDaemonTerminated(context.Background(),&wrps.Int32Value{Value: cmdPkg.Id})
if err!=nil{
log.Printf("setDaemonTerminated grpc error:%v",err)
logger.Printf("setDaemonTerminated grpc error:%v",err)
return err
}
logger.Printf("recode:%v",retCode)
return nil
}
func GetRunnableSlotList() ([]*pb.Command,error) {
//connecting
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Printf("Connection error:%v", err)
logger.Printf("Connection error:%v", err)
return nil,err
}
defer conn.Close()
c := pb.NewBoxServiceClient(conn)
res,err:=c.GetRunnableSlotList(context.Background(),&empty.Empty{})
if err!=nil{
log.Printf("getRunnableSlotList grpc error:%v",err)
logger.Printf("getRunnableSlotList grpc error:%v",err)
}
return res.Command,nil
}
func GetRunnableDaemonList() ([]*pb.Command,error) {
//connecting
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Printf("Connection error:%v", err)
logger.Printf("Connection error:%v", err)
return nil,err
}
defer conn.Close()
c := pb.NewBoxServiceClient(conn)
res,err:=c.GetRunnableDaemonList(context.Background(),&empty.Empty{})
if err!=nil{
log.Printf("getRunnableDaemonList grpc error:%v",err)
logger.Printf("getRunnableDaemonList grpc error:%v",err)
}
return res.Command,nil
}
func GetList(tag string)([]*pb.Command,error) {
//for testing
address=os.Getenv("grpc_server")
//connecting
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Printf("Connection error:%v", err)
logger.Printf("Connection error:%v", err)
return nil,err
}
defer conn.Close()
c := pb.NewBoxServiceClient(conn)
switch tag {
case "runnableSlot":
res,err:=c.GetRunnableSlotList(context.Background(),&empty.Empty{})
if err!=nil{
log.Printf("getRunnableSlotList grpc error:%v",err)
logger.Printf("getRunnableSlotList grpc error:%v",err)
}
return res.Command,nil
case "runnableDaemon":
res,err:=c.GetRunnableDaemonList(context.Background(),&empty.Empty{})
if err!=nil{
log.Printf("getRunnableDaemonList grpc error:%v",err)
logger.Printf("getRunnableDaemonList grpc error:%v",err)
}
return res.Command,nil
case "terminatedDaemon":
res,err:=c.GetTerminatedDaemonList(context.Background(),&empty.Empty{})
if err!=nil{
log.Printf("getTerminatedDaemonList grpc error:%v",err)
logger.Printf("getTerminatedDaemonList grpc error:%v",err)
}
return res.Command,nil
}
return nil, errors.New("no such a tag")
}
\ No newline at end of file
package actuator
import pb "goagent/scalebox"
/*
@Time : 2021/10/25 下午2:57
@Author : Zhouyc
*/
func Instantiate(cmdPkg *pb.Command)error {
var factory DockerLauncherCreator
//todo with slurm
factory=new(ViaSSHCreator)
launcher:=factory.Create()
err:=launcher.launchD(cmdPkg.Host,cmdPkg.CommandText)
if err!=nil{
return err
}
return nil
}
\ No newline at end of file
package actuator
import (
"log"
)
/*
@Time : 2021/10/21 下午3:19
@Author : Zhouyc
*/
type DockerLauncher interface {
launchD(host string,cmd string) error
}
type DockerLauncherCreator interface {
Create() DockerLauncher
}
type ViaSSH struct {
}
func (s *ViaSSH)launchD(host string,cmd string)error{
cmdline:="ssh"+" "+host+" "+"\""+cmd+"\""
timeout:=60
res,err:=launch(cmdline,timeout)
if err!=nil{
log.Printf("ssh error:%v",err)
logger.Printf("ssh error:%v",err)
return err
}
log.Printf(res)
logger.Printf(res)
return nil
}
type ViaSSHCreator struct {
}
func (sc *ViaSSHCreator)Create()DockerLauncher{
l:=new(ViaSSH)
return l
}
package actuator
/*
@Time : 2021/10/25 下午7:07
@Author : Zhouyc
*/
import (
"log"
"testing"
)
func TestLaunch(t *testing.T) {
timeout:=5
out,err:=launch("../shells/3s.sh",timeout)
if err!=nil{
t.Error(err)
}
log.Print(out)
out,err=launch("../shells/10s.sh",timeout)
if err!=nil {
if err.Error() != "sh script timeout" {
t.Error(err)
}
}
log.Print(out)
}
\ No newline at end of file
package actuator
/*
@Time : 2021/10/21 上午10:49
@Author : Zhouyc
*/
import (
"bytes"
"errors"
"log"
"os/exec"
)
import "time"
//import "errors"
func launch(command string,timeout int)(string,error) {
var shellErr=make(chan error)
var done=make(chan string)
cmd:=exec.Command("/usr/bin/bash",command)
var stdout,stderr bytes.Buffer
cmd.Stdout=&stdout
cmd.Stderr=&stderr
//launch
go func() {
err:=cmd.Run()
if err!=nil{
shellErr <- err
log.Printf("Unable to execute shells:%v",err)
return
}
log.Printf("Shells ran successfully")
done<-stdout.String()+stderr.String()
}()
// timeout==-1 means no need for timeout function
if timeout!=-1 {
select {
case <-time.After(time.Second * time.Duration(timeout)):
log.Printf("Shell scripts timeout")
err := cmd.Process.Kill()
if err!=nil{
return "",err
}
return "", errors.New("sh script timeout")
case result := <-done:
log.Printf("stdout&stderr:%s", result)
return result, nil
case err := <-shellErr:
log.Printf("sh script error:%v", err)
return "", err
}
}
select {
case result := <-done:
log.Printf("stdout&stderr:%s", result)
return result, nil
case err := <-shellErr:
log.Printf("error occured while launching:%v", err)
return "shell running error", err
}
}
package actuator
/*
@Time : 2021/10/21 上午11:19
@Author : Zhouyc
*/
import "os"
import "log"
import "time"
func makeLog(path string) (*log.Logger ,error) {
logFile,err:=os.Create(path+time.Now().Format("2006-01-02")+".log")
if err!=nil{
log.Printf("Cannot create logFile:%v",err)
return nil,err
}
logger:=log.New(logFile,"<actuator>",log.Ldate|log.Ltime|log.Lshortfile)
return logger,nil
}
package actuator
import (
"log"
"time"
)
/*
@Time : 2021/10/21 下午5:11
@Author : Zhouyc
*/
func main() {
for {
commandList, err :=GetRunnableSlotList()
if err != nil {
log.Printf("getRunnableSlotList error:%v", err)
logger.Printf("getRunnableSlotList error:%v", err)
}else {
if len(commandList) != 0 {
for _, cmdPkg := range commandList {
cmdPkg := cmdPkg
go func() {
err:=Instantiate(cmdPkg)
if err!=nil{
logger.Printf("Launch container failure:%v",err)
return
}
err = SetSlotInitialized(cmdPkg)
if err != nil {
logger.Printf("setSlotInit grpc error:%v", err)
return
}
}()
}
}
}
commandList,err=GetRunnableDaemonList()
if err != nil {
log.Printf("getRunnableDaemonList error:%v", err)
logger.Printf("getRunnableDaemonList error:%v", err)
}else {
if len(commandList) != 0 {
for _, cmdPkg := range commandList {
cmdPkg := cmdPkg
go func() {
err:=Instantiate(cmdPkg)
if err!=nil{
logger.Printf("Launch container failure:%v",err)
return
}
err = SetRunnableDaemonInitialized(cmdPkg)
if err != nil {
logger.Printf("setDaemonInit grpc error:%v", err)
return
}
}()
}
}
}
commandList,err=GetTerminatedDaemonList()
if err != nil {
log.Printf("getTerminatedDaemonList error:%v", err)
logger.Fatalf("getTerminatedDaemonList error:%v", err)
}else {
if len(commandList) != 0 {
for _, cmdPkg := range commandList {
cmdPkg := cmdPkg
go func() {
err:=Instantiate(cmdPkg)
if err!=nil{
logger.Printf("Launch container failure:%v",err)
return
}
err = SetRunnableDaemonTerminated(cmdPkg)
if err != nil {
logger.Printf("setDaemonTerminated grpc error:%v", err)
return
}
}()
}
}
}
time.Sleep(time.Second*120)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment