forked from dodopizza/kubectl-shovel
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlaunch.go
More file actions
149 lines (121 loc) · 4.2 KB
/
launch.go
File metadata and controls
149 lines (121 loc) · 4.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package cmd
import (
"context"
"fmt"
"strings"
"github.com/pkg/errors"
"github.com/dodopizza/kubectl-shovel/internal/events"
"github.com/dodopizza/kubectl-shovel/internal/flags"
"github.com/dodopizza/kubectl-shovel/internal/globals"
"github.com/dodopizza/kubectl-shovel/internal/kubernetes"
"github.com/dodopizza/kubectl-shovel/internal/utils"
"github.com/dodopizza/kubectl-shovel/internal/watchdog"
)
func (cb *CommandBuilder) newKubeClient() error {
kube, err := kubernetes.NewClient(cb.CommonOptions.kubeConfig)
if err != nil {
return err
}
cb.kube = kube
return nil
}
func (cb *CommandBuilder) args(pod *kubernetes.PodInfo, container *kubernetes.ContainerInfo) []string {
args := flags.NewArgs().
Append("container-id", container.ID).
Append("container-runtime", container.Runtime)
if cb.CommonOptions.StoreOutputOnHost {
args.AppendKey("store-output-on-host")
}
args.
Append("container-name", container.Name).
Append("pod-name", pod.Name).
Append("pod-namespace", pod.Namespace).
AppendRaw(cb.tool.ToolName())
cb.tool.FormatArgs(args, flags.FormatArgsTypeTool)
return args.Get()
}
func (cb *CommandBuilder) copyOutput(pod *kubernetes.PodInfo, output string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pinger := watchdog.NewPinger(cb.kube, pod.Name)
go func() {
if err := pinger.Run(ctx); err != nil {
fmt.Println(err)
}
}()
fmt.Println("Retrieve output from diagnostics job")
if err := cb.kube.CopyFromPod(pod.Name, output, cb.CommonOptions.Output); err != nil {
return errors.Wrap(err, "Error while retrieving diagnostics job output")
}
fmt.Printf("Result successfully written to %s\n", cb.CommonOptions.Output)
return nil
}
func (cb *CommandBuilder) storeOutputOnHost(pod *kubernetes.PodInfo, output string) error {
hostOutput := fmt.Sprintf("%s/%s", cb.CommonOptions.OutputHostPath, output)
fmt.Printf("Output located on host: %s, at path: %s\n", pod.Node, hostOutput)
return nil
}
func (cb *CommandBuilder) launch() error {
if err := cb.newKubeClient(); err != nil {
return errors.Wrap(err, "Failed to init kubernetes client")
}
targetPod, err := cb.kube.GetPodInfo(cb.CommonOptions.Pod)
if err != nil {
return errors.Wrap(err, "Failed to get info about target pod")
}
targetContainerName := cb.CommonOptions.Container
if targetContainerName == "" {
targetContainerName = targetPod.Annotations["kubectl.kubernetes.io/default-container"]
}
targetContainer, err := targetPod.FindContainerInfo(targetContainerName)
if err != nil {
return errors.Wrap(err, "Failed to get info about target container")
}
jobSpec := kubernetes.
NewJobRunSpec(cb.args(targetPod, targetContainer), cb.CommonOptions.Image, targetPod).
WithContainerFSVolume(targetContainer)
if targetPod.ContainsMountedTmp(targetContainerName) {
jobSpec.WithContainerMountsVolume(targetContainer)
}
if cb.CommonOptions.StoreOutputOnHost {
jobSpec.WithHostTmpVolume(cb.CommonOptions.OutputHostPath)
}
// additional spec for privileged tool command
if cb.tool.IsPrivileged() {
jobSpec.
WithPrivilegedOptions().
WithHostProcVolume()
}
fmt.Printf("Spawning diagnostics job with command:\n%s\n", strings.Join(jobSpec.Args, " "))
if err := cb.kube.RunJob(jobSpec); err != nil {
return errors.Wrap(err, "Failed to spawn diagnostics job")
}
fmt.Println("Waiting for a diagnostics job to start")
jobPod, err := cb.kube.WaitPod(jobSpec.Selectors)
if err != nil {
return errors.Wrap(err, "Failed to start diagnostics job")
}
logs, err := cb.kube.ReadPodLogs(jobPod.Name, globals.PluginName)
if err != nil {
return errors.Wrap(err, "Failed to read logs from diagnostics job targetPod")
}
defer utils.Ignore(logs.Close)
awaiter := events.NewEventAwaiter()
output, err := awaiter.AwaitCompletedEvent(logs)
if err != nil {
return errors.Wrap(err, "Failed to complete diagnostics job")
}
// dealing with output
outputHandler := cb.copyOutput
if cb.CommonOptions.StoreOutputOnHost {
outputHandler = cb.storeOutputOnHost
}
if err := outputHandler(jobPod, output); err != nil {
return err
}
fmt.Println("Cleanup diagnostics job")
if err := cb.kube.DeleteJob(jobSpec.Name); err != nil {
return errors.Wrap(err, "Error while deleting job")
}
return nil
}