@@ -24,9 +24,13 @@ import (
2424 "k8s.io/client-go/kubernetes"
2525 "k8s.io/client-go/rest"
2626 "k8s.io/client-go/tools/cache"
27+ "strings"
2728
2829 crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1"
30+ "github.com/crunchydata/postgres-operator/kubeapi"
31+
2932 backupoperator "github.com/crunchydata/postgres-operator/operator/backup"
33+ "k8s.io/client-go/util/workqueue"
3034)
3135
3236// PgbackupController holds connections required by the controller
@@ -35,19 +39,28 @@ type PgbackupController struct {
3539 PgbackupScheme * runtime.Scheme
3640 PgbackupClientset * kubernetes.Clientset
3741 Namespace string
42+ Queue workqueue.RateLimitingInterface
43+ UpdateQueue workqueue.RateLimitingInterface
3844}
3945
4046// Run starts controller
4147func (c * PgbackupController ) Run (ctx context.Context ) error {
4248 log .Debugf ("Watch Pgbackup objects" )
4349
50+ //shut down the work queue to cause workers to end
51+ defer c .Queue .ShutDown ()
52+ defer c .UpdateQueue .ShutDown ()
53+
4454 _ , err := c .watchPgbackups (ctx )
4555 if err != nil {
4656 log .Errorf ("Failed to register watch for Pgbackup resource: %v" , err )
4757 return err
4858 }
4959
5060 <- ctx .Done ()
61+
62+ log .Debugf ("Watch Pgbackup ending" )
63+
5164 return ctx .Err ()
5265}
5366
@@ -81,6 +94,148 @@ func (c *PgbackupController) watchPgbackups(ctx context.Context) (cache.Controll
8194 return controller , nil
8295}
8396
97+ func (c * PgbackupController ) RunWorker () {
98+
99+ //process the 'add' work queue forever
100+ for c .processNextItem () {
101+ }
102+ }
103+
104+ func (c * PgbackupController ) RunUpdateWorker () {
105+
106+ //process the 'add' work queue forever
107+ for c .processNextUpdateItem () {
108+ }
109+ }
110+
111+ func (c * PgbackupController ) processNextUpdateItem () bool {
112+ // Wait until there is a new item in the working queue
113+ key , quit := c .UpdateQueue .Get ()
114+ if quit {
115+ return false
116+ }
117+
118+ log .Debugf ("working on %s" , key .(string ))
119+ keyParts := strings .Split (key .(string ), "/" )
120+ keyNamespace := keyParts [0 ]
121+ keyResourceName := keyParts [1 ]
122+
123+ log .Debugf ("update queue got key ns=[%s] resource=[%s]" , keyNamespace , keyResourceName )
124+
125+ // Tell the queue that we are done with processing this key. This unblocks the key for other workers
126+ // This allows safe parallel processing because two pods with the same key are never processed in
127+ // parallel.
128+ defer c .UpdateQueue .Done (key )
129+
130+ // Invoke the method containing the business logic
131+ // for pgbackups, the convention is the CRD name is always
132+ // the same as the pg-cluster label value
133+
134+ // in this case, the de-dupe logic is to test whether a backup
135+ // job is already running, if so, then we don't create another
136+ // backup job
137+ selector := "pg-cluster=" + keyResourceName + ",pgbackup=true"
138+ jobs , err := kubeapi .GetJobs (c .PgbackupClientset , selector , keyNamespace )
139+ if err != nil {
140+ log .Errorf ("update working...error found " + err .Error ())
141+ return true
142+ }
143+
144+ jobRunning := false
145+ for _ , j := range jobs .Items {
146+ if j .Status .Succeeded <= 0 {
147+ jobRunning = true
148+ }
149+ }
150+
151+ if jobRunning {
152+ log .Debugf ("update working...found job already, would do nothing" )
153+ } else {
154+ log .Debugf ("update working...no job found, means we process" )
155+ b := crv1.Pgbackup {}
156+ found , err := kubeapi .Getpgbackup (c .PgbackupClient , & b , keyResourceName , keyNamespace )
157+ if found {
158+ state := crv1 .PgbackupStateProcessed
159+ message := "Successfully processed Pgbackup by controller"
160+ err = kubeapi .PatchpgbackupStatus (c .PgbackupClient , state , message , & b , b .ObjectMeta .Namespace )
161+ if err != nil {
162+ log .Errorf ("ERROR updating pgbackup status: %s" , err .Error ())
163+ }
164+
165+ backupoperator .AddBackupBase (c .PgbackupClientset , c .PgbackupClient , & b , b .ObjectMeta .Namespace )
166+
167+ //no error, tell the queue to stop tracking history
168+ c .UpdateQueue .Forget (key )
169+ }
170+ }
171+
172+ return true
173+ }
174+
175+ func (c * PgbackupController ) processNextItem () bool {
176+ // Wait until there is a new item in the working queue
177+ key , quit := c .Queue .Get ()
178+
179+ if quit {
180+ return false
181+ }
182+
183+ log .Debugf ("working on %s" , key .(string ))
184+ keyParts := strings .Split (key .(string ), "/" )
185+ keyNamespace := keyParts [0 ]
186+ keyResourceName := keyParts [1 ]
187+
188+ log .Debugf ("queue got key ns=[%s] resource=[%s]" , keyNamespace , keyResourceName )
189+
190+ // Tell the queue that we are done with processing this key. This unblocks the key for other workers
191+ // This allows safe parallel processing because two pods with the same key are never processed in
192+ // parallel.
193+ defer c .Queue .Done (key )
194+
195+ // Invoke the method containing the business logic
196+ // for pgbackups, the convention is the CRD name is always
197+ // the same as the pg-cluster label value
198+
199+ // in this case, the de-dupe logic is to test whether a backup
200+ // job is already running, if so, then we don't create another
201+ // backup job
202+ selector := "pg-cluster=" + keyResourceName + ",pgbackup=true"
203+ jobs , err := kubeapi .GetJobs (c .PgbackupClientset , selector , keyNamespace )
204+ if err != nil {
205+ log .Errorf ("working...error found " + err .Error ())
206+ return true
207+ }
208+
209+ jobRunning := false
210+ for _ , j := range jobs .Items {
211+ if j .Status .Succeeded <= 0 {
212+ jobRunning = true
213+ }
214+ }
215+
216+ if jobRunning {
217+ log .Debugf ("working...found job already, would do nothing" )
218+ } else {
219+ log .Debugf ("working...no job found, means we process" )
220+ b := crv1.Pgbackup {}
221+ found , err := kubeapi .Getpgbackup (c .PgbackupClient , & b , keyResourceName , keyNamespace )
222+ if found {
223+ state := crv1 .PgbackupStateProcessed
224+ message := "Successfully processed Pgbackup by controller"
225+ err = kubeapi .PatchpgbackupStatus (c .PgbackupClient , state , message , & b , b .ObjectMeta .Namespace )
226+ if err != nil {
227+ log .Errorf ("ERROR updating pgbackup status: %s" , err .Error ())
228+ }
229+ backupoperator .AddBackupBase (c .PgbackupClientset , c .PgbackupClient , & b , b .ObjectMeta .Namespace )
230+
231+ //no error, tell the queue to stop tracking history
232+ c .Queue .Forget (key )
233+ }
234+ }
235+ return true
236+ }
237+
238+
84239// onAdd is called when a pgbackup is added
85240func (c * PgbackupController ) onAdd (obj interface {}) {
86241 backup := obj .(* crv1.Pgbackup )
@@ -93,37 +248,32 @@ func (c *PgbackupController) onAdd(obj interface{}) {
93248 return
94249 }
95250
96- // NEVER modify objects from the store. It's a read-only, local cache.
97- // You can use backupScheme.Copy() to make a deep copy of original object and modify this copy
98- // Or create a copy manually for better performance
99- copyObj := backup .DeepCopyObject ()
100-
101- backupCopy := copyObj .(* crv1.Pgbackup )
102- backupCopy .Status = crv1.PgbackupStatus {
103- State : crv1 .PgbackupStateProcessed ,
104- Message : "Successfully processed Pgbackup by controller" ,
105- }
106-
107- err := c .PgbackupClient .Put ().
108- Name (backup .ObjectMeta .Name ).
109- Namespace (backup .ObjectMeta .Namespace ).
110- Resource (crv1 .PgbackupResourcePlural ).
111- Body (backupCopy ).
112- Do ().
113- Error ()
114-
115- if err != nil {
116- log .Errorf ("ERROR updating pgbackup status: %s" , err .Error ())
251+ key , err := cache .MetaNamespaceKeyFunc (obj )
252+ if err == nil {
253+ log .Debugf ("[PgbackupController] putting key in queue %s" , key )
254+ c .Queue .Add (key )
117255 }
118256
119257 //handle new pgbackups
120- backupoperator .AddBackupBase (c .PgbackupClientset , c .PgbackupClient , backupCopy , backup .ObjectMeta .Namespace )
258+ // backupoperator.AddBackupBase(c.PgbackupClientset, c.PgbackupClient, backupCopy, backup.ObjectMeta.Namespace)
121259}
122260
123261// onUpdate is called when a pgbackup is updated
124262func (c * PgbackupController ) onUpdate (oldObj , newObj interface {}) {
263+ oldBackup := oldObj .(* crv1.Pgbackup )
125264 backup := newObj .(* crv1.Pgbackup )
126265 log .Debugf ("[PgbackupController] ns=%s onUpdate %s" , backup .ObjectMeta .Namespace , backup .ObjectMeta .SelfLink )
266+
267+ // check for re-submission of backup
268+ if oldBackup .Spec .BackupStatus != crv1 .PgBackupJobReSubmitted && backup .Spec .BackupStatus == crv1 .PgBackupJobReSubmitted {
269+ log .Debugf ("[PgbackupController] ns=%s onUpdate %s re-submitted" , backup .ObjectMeta .Namespace , backup .ObjectMeta .SelfLink )
270+ key , err := cache .MetaNamespaceKeyFunc (oldObj )
271+ if err == nil {
272+ log .Debugf ("[PgbackupController] putting key in update queue %s" , key )
273+ c .UpdateQueue .Add (key )
274+ }
275+ }
276+
127277}
128278
129279// onDelete is called when a pgbackup is deleted
0 commit comments