Skip to content

Commit 75ec46c

Browse files
committed
Updates the cluster initialization process as needed to
support pgBackRest replica creation. This means that in addition to creating the pgBackRest stanza following initialization of the primary, a full backup is also taken (the backup is executed following a successful stanza creation). This full backup can then be utilized to initialize any replicas in the cluster using the pgBackRest initialization process included in the containers. In order to support this initialization process, any pgreplica CR's created prior to the successful initialization of the primary are placed in a pending state until the cluster is fully initialized (i.e. the stanza and full backup have been created). Then, once the cluster is initialized, and update to the pgreplica's is triggered via an annotation, triggering the creation of the replicas. A 'pgcluster Initialized' state is now included for the pgcluster CR, and is therefore utilized to determine if the cluster is fully initialized (e.g. in order to trigger replica creation). Also included are updates to the pgBackRest restore workflow as needed to support the new crunchy-postgres-ha and crunchy-postgres-gis containers. Now when a restore is initiated, all replicas are deleted in addition to the primary. Then, once the restored primary DB comes back online, the replicas are recreated using any existing pgreplica CR's. This essentially establishes a fully working cluster following a successful restore, which includes all desired replicas and autofailover enabled by default. Additionally, it should be noted that any PVC's previously utilized by those replicas are reused for those same replicas in the restored cluster. Also, the pgcluster CR now shows a 'pgcluster Restoring' state during a restore, and any pgreplica will show a "pgreplica Pending restore" state while the restore is underway.
1 parent 8c40541 commit 75ec46c

File tree

15 files changed

+455
-88
lines changed

15 files changed

+455
-88
lines changed

apis/cr/v1/cluster.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,8 @@ const (
8787
PgclusterStateCreated PgclusterState = "pgcluster Created"
8888
// PgclusterStateProcessed ...
8989
PgclusterStateProcessed PgclusterState = "pgcluster Processed"
90+
// PgclusterStateInitialized ...
91+
PgclusterStateInitialized PgclusterState = "pgcluster Initialized"
92+
// PgclusterStateRestore ...
93+
PgclusterStateRestore PgclusterState = "pgcluster Restoring"
9094
)

apis/cr/v1/replica.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ type PgreplicaState string
6363
const (
6464
// PgreplicaStateCreated ...
6565
PgreplicaStateCreated PgreplicaState = "pgreplica Created"
66+
// PgreplicaStatePending ...
67+
PgreplicaStatePendingInit PgreplicaState = "pgreplica Pending init"
68+
// PgreplicaStatePendingRestore ...
69+
PgreplicaStatePendingRestore PgreplicaState = "pgreplica Pending restore"
6670
// PgreplicaStateProcessed ...
6771
PgreplicaStateProcessed PgreplicaState = "pgreplica Processed"
72+
6873
)

conf/postgres-operator/cluster-deployment.json

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"name": "{{.Name}}",
66
"labels": {
77
"vendor": "crunchydata",
8+
"pgo-pg-database": "true",
89
{{.DeploymentLabels }}
910
}
1011
},
@@ -21,6 +22,7 @@
2122
"labels": {
2223
"name": "{{.Name}}",
2324
"vendor": "crunchydata",
25+
"pgo-pg-database": "true",
2426
{{.PodLabels }}
2527
}
2628
},
@@ -37,8 +39,11 @@
3739
"readinessProbe": {
3840
"exec": {
3941
"command": [
40-
"/opt/cpm/bin/readiness/readiness.sh"
41-
]
42+
"/bin/bash",
43+
"-c",
44+
"[[ -f '/crunchyadm/pgha_initialized' ]]",
45+
"&& pg_isready -h /crunchyadm -U crunchyready"
46+
]
4247
},
4348
"initialDelaySeconds": 15,
4449
"timeoutSeconds": 8
@@ -52,8 +57,19 @@
5257
}, {
5358
"name": "PGHA_USER",
5459
"value": "postgres"
60+
},
61+
{{if .IsInit}}
62+
{
63+
"name": "PGHA_INIT",
64+
"valueFrom": {
65+
"configMapKeyRef": {
66+
"name": "{{.ClusterName}}-pgha-default-config",
67+
"key": "init"
68+
}
69+
}
5570
},
56-
{{if .IsReplica}}
71+
{{ end }}
72+
{{if not .IsInit}}
5773
{
5874
"name": "PGHA_PRIMARY_HOST",
5975
"value": "{{.ClusterName}}"
@@ -75,26 +91,19 @@
7591
"fieldPath": "metadata.namespace"
7692
}
7793
}
78-
}, {
79-
"name": "PATRONI_NAME",
80-
"valueFrom": {
81-
"fieldRef": {
82-
"fieldPath": "metadata.name"
83-
}
84-
}
85-
}, {
94+
}, {
95+
"name": "PATRONI_KUBERNETES_SCOPE_LABEL",
96+
"value": "{{.ScopeLabel}}"
97+
}, {
8698
"name": "PATRONI_SCOPE",
8799
"valueFrom": {
88100
"fieldRef": {
89-
"fieldPath": "metadata.labels['pg-cluster']"
101+
"fieldPath": "metadata.labels['{{.ScopeLabel}}']"
90102
}
91103
}
92104
}, {
93105
"name": "PATRONI_KUBERNETES_LABELS",
94106
"value": "{vendor: \"crunchydata\"}"
95-
}, {
96-
"name": "PATRONI_KUBERNETES_SCOPE_LABEL",
97-
"value": "{{.ScopeLabel}}"
98107
}, {
99108
"name": "PATRONI_LOG_LEVEL",
100109
"value": "INFO"
@@ -205,7 +214,7 @@
205214
{{end}}
206215
{
207216
"configMap": {
208-
"name": "pgo-pgha-default-config",
217+
"name": "{{.ClusterName}}-pgha-default-config",
209218
"optional": true
210219
}
211220
}

conf/postgres-operator/cluster-service.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,16 @@
4444
}
4545
],
4646
"selector": {
47+
{{ if or (eq .Name .ClusterName) (eq .Name (printf "%s%s" .ClusterName "-replica")) }}
48+
"pg-cluster": "{{.ClusterName}}",
49+
{{ if eq .Name (printf "%s%s" .ClusterName "-replica") }}
50+
"role": "replica"
51+
{{else}}
52+
"role": "master"
53+
{{end}}
54+
{{else}}
4755
"service-name": "{{.ServiceName}}"
56+
{{end}}
4857
},
4958
"type": "{{.ServiceType}}",
5059
"sessionAffinity": "None"

config/annotations.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package config
2+
3+
/*
4+
Copyright 2019 Crunchy Data Solutions, Inc.
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
18+
// annotations used by the operator
19+
const ANNOTATION_PGHA_BOOTSTRAP_REPLICA = "pgo-pgha-bootstrap-replica"

config/labels.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ const LABEL_SELECTOR = "selector"
2121
const LABEL_OPERATOR = "postgres-operator"
2222
const LABEL_PG_CLUSTER = "pg-cluster"
2323
const LABEL_PG_CLUSTER_IDENTIFIER = "pg-cluster-id"
24+
const LABEL_PG_DATABASE = "pgo-pg-database"
2425

2526
const LABEL_PGBACKUP = "pgbackup"
2627
const LABEL_PGTASK = "pg-task"
@@ -178,3 +179,7 @@ const LABEL_PGO_DEFAULT_SC = "pgo-default-sc"
178179
const LABEL_FAILOVER_STARTED = "failover-started"
179180

180181
const GLOBAL_CUSTOM_CONFIGMAP = "pgo-custom-pg-config"
182+
183+
const LABEL_PGHA_SCOPE = "crunchy-pgha-scope"
184+
const LABEL_PGHA_DEFAULT_CONFIGMAP = "pgha-default-config"
185+
const LABEL_PGHA_BOOTSTRAP_BACKUP = "pgha-bootstrap-backup"

controller/jobcontroller.go

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/crunchydata/postgres-operator/util"
3535
log "github.com/sirupsen/logrus"
3636
apiv1 "k8s.io/api/batch/v1"
37+
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3738
"k8s.io/apimachinery/pkg/fields"
3839
"k8s.io/client-go/kubernetes"
3940
"k8s.io/client-go/rest"
@@ -100,6 +101,15 @@ func (c *JobController) onUpdate(oldObj, newObj interface{}) {
100101

101102
log.Debugf("[JobController] onUpdate ns=%s %s active=%d succeeded=%d conditions=[%v]", job.ObjectMeta.Namespace, job.ObjectMeta.SelfLink, job.Status.Active, job.Status.Succeeded, job.Status.Conditions)
102103

104+
// determine if a foreground deletion of this resource is in progress
105+
isForegroundDeletion := false
106+
for _, finalizer := range job.Finalizers {
107+
if finalizer == meta_v1.FinalizerDeleteDependents {
108+
isForegroundDeletion = true
109+
break
110+
}
111+
}
112+
103113
var err error
104114

105115
//handle the case of rmdata jobs succeeding
@@ -250,10 +260,74 @@ func (c *JobController) onUpdate(oldObj, newObj interface{}) {
250260
log.Error("error in patching pgtask " + job.ObjectMeta.SelfLink + err.Error())
251261
}
252262
publishBackupComplete(labels[config.LABEL_PG_CLUSTER], job.ObjectMeta.Labels[config.LABEL_PG_CLUSTER_IDENTIFIER], job.ObjectMeta.Labels[config.LABEL_PGOUSER], "pgbackrest", job.ObjectMeta.Namespace, "")
263+
264+
// if initial cluster backup, now annotate all existing pgreplica's to initiate replica creation
265+
pgreplicaList := &crv1.PgreplicaList{}
266+
selector := config.LABEL_PG_CLUSTER+"="+labels[config.LABEL_PG_CLUSTER]
267+
if labels[config.LABEL_PGHA_BOOTSTRAP_BACKUP] == "true" {
268+
log.Debugf("jobController onUpdate initial backup complete")
269+
270+
// get the pgcluster resource for the cluster the replica is a part of
271+
cluster := crv1.Pgcluster{}
272+
_, err = kubeapi.Getpgcluster(c.JobClient, &cluster, labels[config.LABEL_PG_CLUSTER],
273+
job.ObjectMeta.Namespace)
274+
if err != nil {
275+
log.Error(err)
276+
return
277+
}
278+
message := "Cluster has been initialized"
279+
err = kubeapi.PatchpgclusterStatus(c.JobClient, crv1.PgclusterStateInitialized, message,
280+
&cluster, job.ObjectMeta.Namespace)
281+
if err != nil {
282+
log.Error(err)
283+
return
284+
}
285+
286+
287+
err := kubeapi.GetpgreplicasBySelector(c.JobClient, pgreplicaList, selector, job.ObjectMeta.Namespace)
288+
if err != nil {
289+
log.Error(err)
290+
return
291+
}
292+
for _, pgreplica := range pgreplicaList.Items {
293+
if pgreplica.Annotations == nil {
294+
pgreplica.Annotations = make(map[string]string)
295+
}
296+
pgreplica.Annotations[config.ANNOTATION_PGHA_BOOTSTRAP_REPLICA] = "true"
297+
err = kubeapi.Updatepgreplica(c.JobClient, &pgreplica, pgreplica.Name, job.ObjectMeta.Namespace)
298+
if err != nil {
299+
log.Error(err)
300+
return
301+
}
302+
}
303+
}
253304
}
254-
255305
return
306+
}
256307

308+
// create an initial full backup for replica creation once stanza creation is complete
309+
if !isForegroundDeletion && labels[config.LABEL_BACKREST] == "true" &&
310+
labels[config.LABEL_BACKREST_COMMAND] == crv1.PgtaskBackrestStanzaCreate {
311+
log.Debugf("jobController onUpdate backrest stanza-create job case")
312+
if job.Status.Succeeded == 1 {
313+
log.Debugf("backrest stanza successfully created for cluster %s", labels[config.LABEL_PG_CLUSTER])
314+
log.Debugf("proceeding with the initial full backup for cluster %s as needed for replica creation",
315+
labels[config.LABEL_PG_CLUSTER])
316+
317+
var backrestRepoPodName string
318+
for _, cont := range job.Spec.Template.Spec.Containers {
319+
for _, envVar := range cont.Env {
320+
if envVar.Name == "PODNAME" {
321+
backrestRepoPodName = envVar.Value
322+
log.Debugf("the backrest repo pod for the initial backup of cluster %s is %s",
323+
labels[config.LABEL_PG_CLUSTER], backrestRepoPodName)
324+
}
325+
}
326+
}
327+
backrestoperator.CreateInitialBackup(c.JobClient, job.ObjectMeta.Namespace,
328+
labels[config.LABEL_PG_CLUSTER], backrestRepoPodName)
329+
}
330+
return
257331
}
258332

259333
if labels[config.LABEL_PGBASEBACKUP_RESTORE] == "true" {

controller/podcontroller.go

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,37 @@ func (c *PodController) checkReadyStatus(oldpod, newpod *apiv1.Pod, cluster *crv
194194
if v.Name == "database" {
195195
//see if there are pgtasks for adding a policy
196196
if oldDatabaseStatus == false && v.Ready {
197+
198+
// update pgreplica status if performing a restore
199+
if cluster.Status.State == crv1.PgclusterStateRestore {
200+
pgreplicaList := &crv1.PgreplicaList{}
201+
selector := config.LABEL_PG_CLUSTER+"="+clusterName
202+
log.Debugf("Restored cluster %s went to ready, patching replicas", clusterName)
203+
err := kubeapi.GetpgreplicasBySelector(c.PodClient, pgreplicaList, selector, newpod.ObjectMeta.Namespace)
204+
if err != nil {
205+
log.Error(err)
206+
return
207+
}
208+
message := "Cluster has been initialized"
209+
err = kubeapi.PatchpgclusterStatus(c.PodClient, crv1.PgclusterStateInitialized, message, cluster,
210+
newpod.ObjectMeta.Namespace)
211+
if err != nil {
212+
log.Error(err)
213+
return
214+
}
215+
for _, pgreplica := range pgreplicaList.Items {
216+
pgreplica.Spec.Status = "restore"
217+
err = kubeapi.Updatepgreplica(c.PodClient, &pgreplica, pgreplica.Name, newpod.ObjectMeta.Namespace)
218+
if err != nil {
219+
log.Error(err)
220+
return
221+
}
222+
}
223+
return
224+
}
225+
226+
operator.UpdatePghaDefaultConfigInitFlag(c.PodClientset, false, clusterName, newpod.ObjectMeta.Namespace)
227+
197228
log.Debugf("%s went to Ready from Not Ready, apply policies...", clusterName)
198229
taskoperator.ApplyPolicies(clusterName, c.PodClientset, c.PodClient, newpod.ObjectMeta.Namespace)
199230

@@ -408,8 +439,8 @@ func publishClusterComplete(clusterName, namespace string, cluster *crv1.Pgclust
408439
Timestamp: time.Now(),
409440
EventType: events.EventCreateClusterCompleted,
410441
},
411-
Clustername: clusterName,
412-
WorkflowID: cluster.Spec.UserLabels[config.LABEL_WORKFLOW_ID],
442+
Clustername: clusterName,
443+
WorkflowID: cluster.Spec.UserLabels[config.LABEL_WORKFLOW_ID],
413444
}
414445

415446
err := events.Publish(f)

controller/replicacontroller.go

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,33 @@ func (c *PgreplicaController) processNextItem() bool {
126126
log.Error(err)
127127
return false
128128
}
129-
clusteroperator.ScaleBase(c.PgreplicaClientset, c.PgreplicaClient, &replica, replica.ObjectMeta.Namespace)
130129

131-
state := crv1.PgreplicaStateProcessed
132-
message := "Successfully processed Pgreplica by controller"
133-
err = kubeapi.PatchpgreplicaStatus(c.PgreplicaClient, state, message, &replica, replica.ObjectMeta.Namespace)
130+
// get the pgcluster resource for the cluster the replica is a part of
131+
cluster := crv1.Pgcluster{}
132+
_, err = kubeapi.Getpgcluster(c.PgreplicaClient, &cluster, replica.Spec.ClusterName, keyNamespace)
134133
if err != nil {
135-
log.Errorf("ERROR updating pgreplica status: %s", err.Error())
134+
log.Error(err)
135+
return false
136+
}
137+
138+
// only process pgreplica if cluster has been initialized
139+
if cluster.Status.State == crv1.PgclusterStateInitialized {
140+
clusteroperator.ScaleBase(c.PgreplicaClientset, c.PgreplicaClient, &replica, replica.ObjectMeta.Namespace)
141+
142+
state := crv1.PgreplicaStateProcessed
143+
message := "Successfully processed Pgreplica by controller"
144+
err = kubeapi.PatchpgreplicaStatus(c.PgreplicaClient, state, message, &replica, replica.ObjectMeta.Namespace)
145+
if err != nil {
146+
log.Errorf("ERROR updating pgreplica status: %s", err.Error())
147+
}
148+
} else {
149+
150+
state := crv1.PgreplicaStatePendingInit
151+
message := "Pgreplica processing pending the creation of the initial backup"
152+
err = kubeapi.PatchpgreplicaStatus(c.PgreplicaClient, state, message, &replica, replica.ObjectMeta.Namespace)
153+
if err != nil {
154+
log.Errorf("ERROR updating pgreplica status: %s", err.Error())
155+
}
136156
}
137157

138158
//no error, tell the queue to stop tracking history
@@ -163,9 +183,34 @@ func (c *PgreplicaController) onAdd(obj interface{}) {
163183

164184
// onUpdate is called when a pgreplica is updated
165185
func (c *PgreplicaController) onUpdate(oldObj, newObj interface{}) {
166-
//newExample := newObj.(*crv1.Pgreplica)
167-
//log.Debugf("[PgreplicaController] ns=%s %s ", newExample.ObjectMeta.Namespace, newExample.ObjectMeta.Name)
186+
187+
newPgreplica := newObj.(*crv1.Pgreplica)
188+
189+
log.Debugf("[PgreplicaController] onUpdate ns=%s %s", newPgreplica.ObjectMeta.Namespace,
190+
newPgreplica.ObjectMeta.SelfLink)
168191

192+
// get the pgcluster resource for the cluster the replica is a part of
193+
cluster := crv1.Pgcluster{}
194+
_, err := kubeapi.Getpgcluster(c.PgreplicaClient, &cluster, newPgreplica.Spec.ClusterName,
195+
newPgreplica.ObjectMeta.Namespace)
196+
if err != nil {
197+
log.Error(err)
198+
return
199+
}
200+
201+
// only process pgreplica if cluster has been initialized
202+
if cluster.Status.State == crv1.PgclusterStateInitialized && newPgreplica.Spec.Status != "complete" {
203+
clusteroperator.ScaleBase(c.PgreplicaClientset, c.PgreplicaClient, newPgreplica,
204+
newPgreplica.ObjectMeta.Namespace)
205+
206+
state := crv1.PgreplicaStateProcessed
207+
message := "Successfully processed Pgreplica by controller"
208+
err := kubeapi.PatchpgreplicaStatus(c.PgreplicaClient, state, message, newPgreplica,
209+
newPgreplica.ObjectMeta.Namespace)
210+
if err != nil {
211+
log.Errorf("ERROR updating pgreplica status: %s", err.Error())
212+
}
213+
}
169214
}
170215

171216
// onDelete is called when a pgreplica is deleted

0 commit comments

Comments
 (0)