Skip to content

Commit ef24ec1

Browse files
flamingdumpsterJonathan S. Katz
authored andcommitted
Add queue to pgtask backport (#944)
* Backport of Queue pgbackup PR#915 to operator 3.5 * add queue to cluster controller for onAdd events * Remove commented code after testing. * backport of pgtask queue
1 parent b24f1d6 commit ef24ec1

File tree

6 files changed

+232
-93
lines changed

6 files changed

+232
-93
lines changed

controller/replicacontroller.go

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1"
2828
"github.com/crunchydata/postgres-operator/kubeapi"
29+
"github.com/crunchydata/postgres-operator/util"
2930
clusteroperator "github.com/crunchydata/postgres-operator/operator/cluster"
3031
"k8s.io/client-go/util/workqueue"
3132
"strings"
@@ -167,29 +168,6 @@ func (c *PgreplicaController) onAdd(obj interface{}) {
167168
return
168169
}
169170

170-
// NEVER modify objects from the store. It's a read-only, local cache.
171-
// You can use clusterScheme.Copy() to make a deep copy of original object and modify this copy
172-
// Or create a copy manually for better performance
173-
// copyObj := replica.DeepCopyObject()
174-
// replicaCopy := copyObj.(*crv1.Pgreplica)
175-
176-
// replicaCopy.Status = crv1.PgreplicaStatus{
177-
// State: crv1.PgreplicaStateProcessed,
178-
// Message: "Successfully processed Pgreplica by controller",
179-
// }
180-
181-
// err := c.PgreplicaClient.Put().
182-
// Name(replica.ObjectMeta.Name).
183-
// Namespace(replica.ObjectMeta.Namespace).
184-
// Resource(crv1.PgreplicaResourcePlural).
185-
// Body(replicaCopy).
186-
// Do().
187-
// Error()
188-
189-
// if err != nil {
190-
// log.Errorf("ERROR updating pgreplica status: %s", err.Error())
191-
// }
192-
193171
key, err := cache.MetaNamespaceKeyFunc(obj)
194172
if err == nil {
195173
log.Debugf("onAdd putting key in queue %s", key)
@@ -198,10 +176,6 @@ func (c *PgreplicaController) onAdd(obj interface{}) {
198176
log.Errorf("replicacontroller: error acquiring key: %s", err.Error())
199177
}
200178

201-
//handle the case of when a pgreplica is added which is
202-
//scaling up a cluster
203-
// clusteroperator.ScaleBase(c.PgreplicaClientset, c.PgreplicaClient, replicaCopy, replicaCopy.ObjectMeta.Namespace)
204-
205179
}
206180

207181
// onUpdate is called when a pgreplica is updated
@@ -215,4 +189,18 @@ func (c *PgreplicaController) onUpdate(oldObj, newObj interface{}) {
215189
func (c *PgreplicaController) onDelete(obj interface{}) {
216190
replica := obj.(*crv1.Pgreplica)
217191
log.Debugf("[PgreplicaController] OnDelete ns=%s %s", replica.ObjectMeta.Namespace, replica.ObjectMeta.SelfLink)
192+
193+
//make sure we are not removing a replica deployment
194+
//that is now the primary after a failover
195+
dep, found, _ := kubeapi.GetDeployment(c.PgreplicaClientset, replica.Spec.Name, replica.ObjectMeta.Namespace)
196+
if found {
197+
if dep.ObjectMeta.Labels[util.LABEL_SERVICE_NAME] == dep.ObjectMeta.Labels[util.LABEL_PG_CLUSTER] {
198+
//the replica was made a primary at some point
199+
//we will not scale down the deployment
200+
log.Debugf("[PgreplicaController] OnDelete not scaling down the replica since it is acting as a primary")
201+
} else {
202+
clusteroperator.ScaleDownBase(c.PgreplicaClientset, c.PgreplicaClient, replica, replica.ObjectMeta.Namespace)
203+
}
204+
}
205+
218206
}

controller/taskcontroller.go

Lines changed: 123 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,17 @@ limitations under the License.
1717

1818
import (
1919
"context"
20-
// "time"
21-
20+
"strings"
2221
log "github.com/sirupsen/logrus"
2322
"k8s.io/apimachinery/pkg/fields"
2423
"k8s.io/apimachinery/pkg/runtime"
2524
"k8s.io/client-go/kubernetes"
2625
"k8s.io/client-go/rest"
2726
"k8s.io/client-go/tools/cache"
27+
"k8s.io/client-go/util/workqueue"
2828

2929
crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1"
30+
"github.com/crunchydata/postgres-operator/util"
3031
"github.com/crunchydata/postgres-operator/kubeapi"
3132
backrestoperator "github.com/crunchydata/postgres-operator/operator/backrest"
3233
benchmarkoperator "github.com/crunchydata/postgres-operator/operator/benchmark"
@@ -41,13 +42,17 @@ type PgtaskController struct {
4142
PgtaskClient *rest.RESTClient
4243
PgtaskScheme *runtime.Scheme
4344
PgtaskClientset *kubernetes.Clientset
45+
Queue workqueue.RateLimitingInterface
4446
Namespace string
4547
}
4648

4749
// Run starts an pgtask resource controller
4850
func (c *PgtaskController) Run(ctx context.Context) error {
4951
log.Debug("Watch Pgtask objects")
5052

53+
//shut down the work queue to cause workers to end
54+
defer c.Queue.ShutDown()
55+
5156
// Watch Example objects
5257
_, err := c.watchPgtasks(ctx)
5358
if err != nil {
@@ -89,129 +94,143 @@ func (c *PgtaskController) watchPgtasks(ctx context.Context) (cache.Controller,
8994
return controller, nil
9095
}
9196

92-
// onAdd is called when a pgtask is added
93-
func (c *PgtaskController) onAdd(obj interface{}) {
94-
task := obj.(*crv1.Pgtask)
95-
log.Debugf("[PgtaskController] onAdd ns=%s %s", task.ObjectMeta.Namespace, task.ObjectMeta.SelfLink)
96-
97-
//handle the case of when the operator restarts, we do not want
98-
//to process pgtasks already processed
99-
if task.Status.State == crv1.PgtaskStateProcessed {
100-
log.Debug("pgtask " + task.ObjectMeta.Name + " already processed")
101-
return
102-
}
10397

104-
//time.Sleep(time.Second * time.Duration(2))
10598

106-
// NEVER modify objects from the store. It's a read-only, local cache.
107-
// You can use taskScheme.Copy() to make a deep copy of original object and modify this copy
108-
// Or create a copy manually for better performance
109-
copyObj := task.DeepCopyObject()
110-
taskCopy := copyObj.(*crv1.Pgtask)
99+
func (c *PgtaskController) RunWorker() {
111100

112-
//update the status of the task as processed to prevent reprocessing
113-
taskCopy.Status = crv1.PgtaskStatus{
114-
State: crv1.PgtaskStateProcessed,
115-
Message: "Successfully processed Pgtask by controller",
101+
//process the 'add' work queue forever
102+
for c.processNextItem() {
116103
}
117-
task.Status = crv1.PgtaskStatus{
118-
State: crv1.PgtaskStateProcessed,
119-
Message: "Successfully processed Pgtask by controller",
104+
}
105+
106+
func (c *PgtaskController) processNextItem() bool {
107+
// Wait until there is a new item in the working queue
108+
key, quit := c.Queue.Get()
109+
if quit {
110+
return false
120111
}
121112

122-
//Body(taskCopy).
113+
log.Debugf("working on %s", key.(string))
114+
keyParts := strings.Split(key.(string), "/")
115+
keyNamespace := keyParts[0]
116+
keyResourceName := keyParts[1]
123117

124-
//get pgtask
118+
log.Debugf("queue got key ns=[%s] resource=[%s]", keyNamespace, keyResourceName)
119+
120+
// Tell the queue that we are done with processing this key. This unblocks the key for other workers
121+
// This allows safe parallel processing because two pods with the same key are never processed in
122+
// parallel.
123+
defer c.Queue.Done(key)
125124

126125
tmpTask := crv1.Pgtask{}
127-
found, err := kubeapi.Getpgtask(c.PgtaskClient, &tmpTask, task.ObjectMeta.Name, task.ObjectMeta.Namespace)
126+
found, err := kubeapi.Getpgtask(c.PgtaskClient, &tmpTask, keyResourceName, keyNamespace)
128127
if !found {
129128
log.Errorf("ERROR onAdd getting pgtask : %s", err.Error())
130-
return
129+
return false
131130
}
132131

133132
//update pgtask
134-
tmpTask.Status = crv1.PgtaskStatus{
135-
State: crv1.PgtaskStateProcessed,
136-
Message: "Successfully processed Pgtask by controller",
137-
}
138-
139-
err = kubeapi.Updatepgtask(c.PgtaskClient, &tmpTask, task.ObjectMeta.Name, task.ObjectMeta.Namespace)
140-
141-
/**
142-
err = c.PgtaskClient.Put().
143-
Name(tmpTask.ObjectMeta.Name).
144-
Namespace(tmpTask.ObjectMeta.Namespace).
145-
Resource(crv1.PgtaskResourcePlural).
146-
Body(tmpTask).
147-
Do().
148-
Error()
149-
150-
*/
133+
state := crv1.PgtaskStateProcessed
134+
message := "Successfully processed Pgtask by controller"
135+
err = kubeapi.PatchpgtaskStatus(c.PgtaskClient, state, message, &tmpTask, keyNamespace)
151136
if err != nil {
152137
log.Errorf("ERROR onAdd updating pgtask status: %s", err.Error())
153-
return
138+
return false
154139
}
155140

156141
//process the incoming task
157-
switch task.Spec.TaskType {
142+
switch tmpTask.Spec.TaskType {
143+
// case crv1.PgtaskMinorUpgrade:
144+
// log.Debug("delete minor upgrade task added")
145+
// clusteroperator.AddUpgrade(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace)
158146
case crv1.PgtaskDeletePgbouncer:
159147
log.Debug("delete pgbouncer task added")
160-
clusteroperator.DeletePgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace)
148+
clusteroperator.DeletePgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace)
161149
case crv1.PgtaskReconfigurePgbouncer:
162150
log.Debug("Reconfiguredelete pgbouncer task added")
163-
clusteroperator.ReconfigurePgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace)
151+
clusteroperator.ReconfigurePgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace)
164152
case crv1.PgtaskAddPgbouncer:
165153
log.Debug("add pgbouncer task added")
166-
clusteroperator.AddPgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace)
154+
clusteroperator.AddPgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace)
167155
case crv1.PgtaskDeletePgpool:
168156
log.Debug("delete pgpool task added")
169-
clusteroperator.DeletePgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace)
157+
clusteroperator.DeletePgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace)
170158
case crv1.PgtaskReconfigurePgpool:
171159
log.Debug("Reconfiguredelete pgpool task added")
172-
clusteroperator.ReconfigurePgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace)
160+
clusteroperator.ReconfigurePgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace)
173161
case crv1.PgtaskAddPgpool:
174162
log.Debug("add pgpool task added")
175-
clusteroperator.AddPgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace)
163+
clusteroperator.AddPgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace)
176164
case crv1.PgtaskFailover:
177165
log.Debug("failover task added")
178-
clusteroperator.FailoverBase(task.ObjectMeta.Namespace, c.PgtaskClientset, c.PgtaskClient, task, c.PgtaskConfig)
166+
if !dupeFailover(c.PgtaskClient, &tmpTask, keyNamespace) {
167+
clusteroperator.FailoverBase(keyNamespace, c.PgtaskClientset, c.PgtaskClient, &tmpTask, c.PgtaskConfig)
168+
} else {
169+
log.Debug("skipping duplicate onAdd failover task %s/%s", keyNamespace, keyResourceName)
170+
}
179171

180172
case crv1.PgtaskDeleteData:
181173
log.Debug("delete data task added")
182-
taskoperator.RemoveData(task.ObjectMeta.Namespace, c.PgtaskClientset, task)
174+
if !dupeDeleteData(c.PgtaskClient, &tmpTask, keyNamespace) {
175+
taskoperator.RemoveData(keyNamespace, c.PgtaskClientset, &tmpTask)
176+
} else {
177+
log.Debug("skipping duplicate onAdd delete data task %s/%s", keyNamespace, keyResourceName)
178+
}
183179
case crv1.PgtaskDeleteBackups:
184180
log.Debug("delete backups task added")
185-
taskoperator.RemoveBackups(task.ObjectMeta.Namespace, c.PgtaskClientset, task)
181+
taskoperator.RemoveBackups(keyNamespace, c.PgtaskClientset, &tmpTask)
186182
case crv1.PgtaskBackrest:
187183
log.Debug("backrest task added")
188-
backrestoperator.Backrest(task.ObjectMeta.Namespace, c.PgtaskClientset, task)
184+
backrestoperator.Backrest(keyNamespace, c.PgtaskClientset, &tmpTask)
189185
case crv1.PgtaskBackrestRestore:
190186
log.Debug("backrest restore task added")
191-
backrestoperator.Restore(c.PgtaskClient, task.ObjectMeta.Namespace, c.PgtaskClientset, task)
187+
backrestoperator.Restore(c.PgtaskClient, keyNamespace, c.PgtaskClientset, &tmpTask)
192188

193189
case crv1.PgtaskpgDump:
194190
log.Debug("pgDump task added")
195-
pgdumpoperator.Dump(task.ObjectMeta.Namespace, c.PgtaskClientset, c.PgtaskClient, task)
191+
pgdumpoperator.Dump(keyNamespace, c.PgtaskClientset, c.PgtaskClient, &tmpTask)
196192
case crv1.PgtaskpgRestore:
197193
log.Debug("pgDump restore task added")
198-
pgdumpoperator.Restore(task.ObjectMeta.Namespace, c.PgtaskClientset, c.PgtaskClient, task)
194+
pgdumpoperator.Restore(keyNamespace, c.PgtaskClientset, c.PgtaskClient, &tmpTask)
199195

200196
case crv1.PgtaskAutoFailover:
201-
log.Debugf("autofailover task added %s", task.ObjectMeta.Name)
197+
log.Debugf("autofailover task added %s", keyResourceName)
202198
case crv1.PgtaskWorkflow:
203-
log.Debugf("workflow task added [%s] ID [%s]", task.ObjectMeta.Name, task.Spec.Parameters[crv1.PgtaskWorkflowID])
199+
log.Debugf("workflow task added [%s] ID [%s]", keyResourceName, tmpTask.Spec.Parameters[crv1.PgtaskWorkflowID])
204200

205201
case crv1.PgtaskBenchmark:
206202
log.Debug("benchmark task added")
207-
benchmarkoperator.Create(task.ObjectMeta.Namespace, c.PgtaskClientset, c.PgtaskClient, task)
203+
benchmarkoperator.Create(keyNamespace, c.PgtaskClientset, c.PgtaskClient, &tmpTask)
208204

209205
default:
210-
log.Debugf("unknown task type on pgtask added [%s]", task.Spec.TaskType)
206+
log.Debugf("unknown task type on pgtask added [%s]", tmpTask.Spec.TaskType)
211207
}
212208

209+
return true
210+
213211
}
214212

213+
// onAdd is called when a pgtask is added
214+
func (c *PgtaskController) onAdd(obj interface{}) {
215+
task := obj.(*crv1.Pgtask)
216+
// log.Debugf("[PgtaskController] onAdd ns=%s %s", task.ObjectMeta.Namespace, task.ObjectMeta.SelfLink)
217+
218+
//handle the case of when the operator restarts, we do not want
219+
//to process pgtasks already processed
220+
if task.Status.State == crv1.PgtaskStateProcessed {
221+
log.Debug("pgtask " + task.ObjectMeta.Name + " already processed")
222+
return
223+
}
224+
225+
key, err := cache.MetaNamespaceKeyFunc(obj)
226+
if err == nil {
227+
log.Debugf("task putting key in queue %s", key)
228+
c.Queue.Add(key)
229+
}
230+
231+
}
232+
233+
215234
// onUpdate is called when a pgtask is updated
216235
func (c *PgtaskController) onUpdate(oldObj, newObj interface{}) {
217236
task := newObj.(*crv1.Pgtask)
@@ -223,3 +242,41 @@ func (c *PgtaskController) onDelete(obj interface{}) {
223242
task := obj.(*crv1.Pgtask)
224243
log.Debugf("[PgtaskController] onDelete ns=%s %s", task.ObjectMeta.Namespace, task.ObjectMeta.SelfLink)
225244
}
245+
246+
//de-dupe logic for a failover, if the failover started
247+
//parameter is set, it means a failover has already been
248+
//started on this
249+
func dupeFailover(restClient *rest.RESTClient, task *crv1.Pgtask, ns string) bool {
250+
tmp := crv1.Pgtask{}
251+
252+
found, _ := kubeapi.Getpgtask(restClient, &tmp, task.Spec.Name, ns)
253+
if !found {
254+
//a big time error if this occurs
255+
return false
256+
}
257+
258+
if tmp.Spec.Parameters[util.LABEL_FAILOVER_STARTED] == "" {
259+
return false
260+
}
261+
262+
return true
263+
}
264+
265+
//de-dupe logic for a delete data, if the delete data job started
266+
//parameter is set, it means a delete data job has already been
267+
//started on this
268+
func dupeDeleteData(restClient *rest.RESTClient, task *crv1.Pgtask, ns string) bool {
269+
tmp := crv1.Pgtask{}
270+
271+
found, _ := kubeapi.Getpgtask(restClient, &tmp, task.Spec.Name, ns)
272+
if !found {
273+
//a big time error if this occurs
274+
return false
275+
}
276+
277+
if tmp.Spec.Parameters[util.LABEL_DELETE_DATA_STARTED] == "" {
278+
return false
279+
}
280+
281+
return true
282+
}

0 commit comments

Comments
 (0)