@@ -18,6 +18,8 @@ limitations under the License.
1818import (
1919 "context"
2020 "fmt"
21+ "strings"
22+ "io/ioutil"
2123 log "github.com/sirupsen/logrus"
2224 crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1"
2325 "github.com/crunchydata/postgres-operator/kubeapi"
@@ -29,20 +31,25 @@ import (
2931 "k8s.io/client-go/kubernetes"
3032 "k8s.io/client-go/rest"
3133 "k8s.io/client-go/tools/cache"
34+ "k8s.io/client-go/util/workqueue"
3235)
3336
3437// PgclusterController holds the connections for the controller
3538type PgclusterController struct {
3639 PgclusterClient * rest.RESTClient
3740 PgclusterScheme * runtime.Scheme
3841 PgclusterClientset * kubernetes.Clientset
42+ Queue workqueue.RateLimitingInterface
3943 Namespace string
4044}
4145
4246// Run starts an pgcluster resource controller
4347func (c * PgclusterController ) Run (ctx context.Context ) error {
4448 log .Debug ("Watch Pgcluster objects" )
4549
50+ //shut down the work queue to cause workers to end
51+ defer c .Queue .ShutDown ()
52+
4653 _ , err := c .watchPgclusters (ctx )
4754 if err != nil {
4855 log .Errorf ("Failed to register watch for Pgcluster resource: %v" , err )
@@ -95,34 +102,82 @@ func (c *PgclusterController) onAdd(obj interface{}) {
95102 return
96103 }
97104
98- // NEVER modify objects from the store. It's a read-only, local cache.
99- // You can use clusterScheme.Copy() to make a deep copy of original object and modify this copy
100- // Or create a copy manually for better performance
101- copyObj := cluster .DeepCopyObject ()
102- clusterCopy := copyObj .(* crv1.Pgcluster )
105+ key , err := cache .MetaNamespaceKeyFunc (obj )
106+ if err == nil {
107+ log .Debugf ("cluster putting key in queue %s" , key )
108+ c .Queue .Add (key )
109+ }
110+
111+ }
112+
113+
114+ func (c * PgclusterController ) RunWorker () {
115+
116+ //process the 'add' work queue forever
117+ for c .processNextItem () {
118+ }
119+ }
103120
104- clusterCopy .Status = crv1.PgclusterStatus {
105- State : crv1 .PgclusterStateProcessed ,
106- Message : "Successfully processed Pgcluster by controller" ,
121+ func (c * PgclusterController ) processNextItem () bool {
122+ // Wait until there is a new item in the working queue
123+ key , quit := c .Queue .Get ()
124+ if quit {
125+ return false
107126 }
108127
109- err := c .PgclusterClient .Put ().
110- Name (cluster .ObjectMeta .Name ).
111- Namespace (cluster .ObjectMeta .Namespace ).
112- Resource (crv1 .PgclusterResourcePlural ).
113- Body (clusterCopy ).
114- Do ().
115- Error ()
128+ log .Debugf ("working on %s" , key .(string ))
129+ keyParts := strings .Split (key .(string ), "/" )
130+ keyNamespace := keyParts [0 ]
131+ keyResourceName := keyParts [1 ]
132+
133+ log .Debugf ("cluster add queue got key ns=[%s] resource=[%s]" , keyNamespace , keyResourceName )
134+
135+ // Tell the queue that we are done with processing this key. This unblocks the key for other workers
136+ // This allows safe parallel processing because two pods with the same key are never processed in
137+ // parallel.
138+ defer c .Queue .Done (key )
116139
140+ // Invoke the method containing the business logic
141+ // for pgbackups, the convention is the CRD name is always
142+ // the same as the pg-cluster label value
143+
144+ // in this case, the de-dupe logic is to test whether a cluster
145+ // deployment exists , if so, then we don't create another
146+ _ , found , err := kubeapi .GetDeployment (c .PgclusterClientset , keyResourceName , keyNamespace )
147+
148+ if found {
149+ log .Debugf ("cluster add - dep already found, not creating again" )
150+ return true
151+ }
152+
153+ //get the pgcluster
154+ cluster := crv1.Pgcluster {}
155+ found , err = kubeapi .Getpgcluster (c .PgclusterClient , & cluster , keyResourceName , keyNamespace )
156+ if ! found {
157+ log .Debugf ("cluster add - pgcluster not found, this is invalid" )
158+ return false
159+ }
160+
161+
162+ addIdentifier (& cluster )
163+
164+ state := crv1 .PgclusterStateProcessed
165+ message := "Successfully processed Pgcluster by controller"
166+
167+ err = kubeapi .PatchpgclusterStatus (c .PgclusterClient , state , message , & cluster , keyNamespace )
117168 if err != nil {
118169 log .Errorf ("ERROR updating pgcluster status on add: %s" , err .Error ())
170+ return false
119171 }
120172
121173 log .Debugf ("pgcluster added: %s" , cluster .ObjectMeta .Name )
122174
123- clusteroperator .AddClusterBase (c .PgclusterClientset , c .PgclusterClient , clusterCopy , cluster .ObjectMeta .Namespace )
175+ clusteroperator .AddClusterBase (c .PgclusterClientset , c .PgclusterClient , & cluster , cluster .ObjectMeta .Namespace )
176+
177+ return true
124178}
125179
180+
126181// onUpdate is called when a pgcluster is updated
127182func (c * PgclusterController ) onUpdate (oldObj , newObj interface {}) {
128183 oldcluster := oldObj .(* crv1.Pgcluster )
@@ -212,3 +267,12 @@ func getReadyStatus(pod *v1.Pod) (string, bool) {
212267 return fmt .Sprintf ("%d/%d" , readyCount , containerCount ), equal
213268
214269}
270+
271+ func addIdentifier (clusterCopy * crv1.Pgcluster ) {
272+ u , err := ioutil .ReadFile ("/proc/sys/kernel/random/uuid" )
273+ if err != nil {
274+ log .Error (err )
275+ }
276+
277+ clusterCopy .ObjectMeta .Labels [util .LABEL_PG_CLUSTER_IDENTIFIER ] = string (u [:len (u )- 1 ])
278+ }
0 commit comments