Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ RUN apt-get update && \
apt-get install git -y

COPY . /src
RUN python -m pip install -U setuptools==${SETUPTOOLS_VERSION} && \
python setup.py clean bdist_wheel
RUN python -m pip install -U setuptools==${SETUPTOOLS_VERSION} \
&& pip install setuptools_scm vcs_versioning \
&& python setup.py clean bdist_wheel


# Run container
Expand Down
99 changes: 59 additions & 40 deletions crate/operator/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
from crate.operator.utils.jwt import crate_version_supports_jwt
from crate.operator.utils.k8s_api_client import GlobalApiClient
from crate.operator.utils.kopf import StateBasedSubHandler
from crate.operator.utils.kubeapi import call_kubeapi
from crate.operator.utils.kubeapi import call_kubeapi, has_ingress_route_tcp
from crate.operator.utils.secrets import gen_password
from crate.operator.utils.typing import LabelType
from crate.operator.utils.version import CrateVersion
Expand Down Expand Up @@ -1231,59 +1231,67 @@ def get_data_service(
dns_record: Optional[str],
source_ranges: Optional[List[str]] = None,
additional_annotations: Optional[Dict] = None,
use_traefik: bool = False,
) -> V1Service:
res_annotations = {}
if config.CLOUD_PROVIDER == CloudProvider.AWS:
res_annotations.update(
{
# https://kubernetes.io/docs/concepts/services-networking/service/#connection-draining-on-aws
"service.beta.kubernetes.io/aws-load-balancer-connection-draining-enabled": "true", # noqa
"service.beta.kubernetes.io/aws-load-balancer-connection-draining-timeout": "1800", # noqa
# Default idle timeout is 60s, which kills the connection on long-running queries # noqa
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600", # noqa
"service.beta.kubernetes.io/aws-load-balancer-type": "nlb", # noqa
"service.beta.kubernetes.io/aws-load-balancer-cross-zone-load-balancing-enabled": "true", # noqa
}
)
elif config.CLOUD_PROVIDER == CloudProvider.AZURE:
# https://docs.microsoft.com/en-us/azure/aks/load-balancer-standard#additional-customizations-via-kubernetes-annotations
# https://docs.microsoft.com/en-us/azure/load-balancer/load-balancer-tcp-reset
res_annotations.update(
{
"service.beta.kubernetes.io/azure-load-balancer-disable-tcp-reset": "false", # noqa
"service.beta.kubernetes.io/azure-load-balancer-tcp-idle-timeout": "30", # noqa
}
)

if dns_record:
res_annotations.update(
{"external-dns.alpha.kubernetes.io/hostname": dns_record}
)
if not use_traefik:
if config.CLOUD_PROVIDER == CloudProvider.AWS:
res_annotations.update(
{
# https://kubernetes.io/docs/concepts/services-networking/service/#connection-draining-on-aws
"service.beta.kubernetes.io/aws-load-balancer-connection-draining-enabled": "true", # noqa
"service.beta.kubernetes.io/aws-load-balancer-connection-draining-timeout": "1800", # noqa
# Default idle timeout is 60s, which kills the connection on long-running queries # noqa
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600", # noqa
"service.beta.kubernetes.io/aws-load-balancer-type": "nlb", # noqa
"service.beta.kubernetes.io/aws-load-balancer-cross-zone-load-balancing-enabled": "true", # noqa
}
)
elif config.CLOUD_PROVIDER == CloudProvider.AZURE:
# https://docs.microsoft.com/en-us/azure/aks/load-balancer-standard#additional-customizations-via-kubernetes-annotations
# https://docs.microsoft.com/en-us/azure/load-balancer/load-balancer-tcp-reset
res_annotations.update(
{
"service.beta.kubernetes.io/azure-load-balancer-disable-tcp-reset": "false", # noqa
"service.beta.kubernetes.io/azure-load-balancer-tcp-idle-timeout": "30", # noqa
}
)

if dns_record:
res_annotations.update(
{"external-dns.alpha.kubernetes.io/hostname": dns_record}
)

if additional_annotations:
res_annotations.update(additional_annotations)

service_name = f"crate-{name}"
service_type = "ClusterIP" if use_traefik else "LoadBalancer"

spec_kwargs = dict(
ports=[
V1ServicePort(name="http", port=http_port, target_port=Port.HTTP.value),
V1ServicePort(
name="psql", port=postgres_port, target_port=Port.POSTGRES.value
),
],
selector={LABEL_COMPONENT: "cratedb", LABEL_NAME: name},
type=service_type,
)

if not use_traefik:
spec_kwargs["external_traffic_policy"] = "Local"
if source_ranges:
spec_kwargs["load_balancer_source_ranges"] = source_ranges

return V1Service(
metadata=V1ObjectMeta(
annotations=res_annotations,
labels=labels,
name=service_name,
name=f"crate-{name}",
owner_references=owner_references,
),
spec=V1ServiceSpec(
ports=[
V1ServicePort(name="http", port=http_port, target_port=Port.HTTP.value),
V1ServicePort(
name="psql", port=postgres_port, target_port=Port.POSTGRES.value
),
],
selector={LABEL_COMPONENT: "cratedb", LABEL_NAME: name},
type="LoadBalancer",
external_traffic_policy="Local",
load_balancer_source_ranges=source_ranges if source_ranges else None,
),
spec=V1ServiceSpec(**spec_kwargs),
)


Expand Down Expand Up @@ -1366,6 +1374,7 @@ async def create_services(
logger: logging.Logger,
source_ranges: Optional[List[str]] = None,
additional_annotations: Optional[Dict] = None,
use_traefik: bool = False,
) -> None:
async with GlobalApiClient() as api_client:
core = CoreV1Api(api_client)
Expand All @@ -1380,6 +1389,7 @@ async def create_services(
dns_record,
source_ranges,
additional_annotations=additional_annotations,
use_traefik=use_traefik,
)
await call_kubeapi(
core.create_namespaced_service,
Expand Down Expand Up @@ -1434,6 +1444,14 @@ async def recreate_services(
)
dns_record = spec.get("cluster", {}).get("externalDNS", None)

use_traefik = await has_ingress_route_tcp(namespace)
logger.info(f"Traefik detected: {use_traefik}")
if use_traefik:
logger.info(
f"IngressRouteTCP detected in namespace '{namespace}', "
"recreating data service as ClusterIP"
)

await create_services(
owner_references,
namespace,
Expand All @@ -1446,6 +1464,7 @@ async def recreate_services(
logger,
source_ranges,
additional_annotations,
use_traefik=use_traefik,
)


Expand Down
18 changes: 13 additions & 5 deletions crate/operator/handlers/handle_update_allowed_cidrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,19 @@ async def update_service_allowed_cidrs(
if not service:
return

await core.patch_namespaced_service(
name=f"crate-{name}",
namespace=namespace,
body={"spec": {"loadBalancerSourceRanges": change.new}},
)
# Only patch loadBalancerSourceRanges if the service is of type LoadBalancer.
# For ClusterIP this field is forbidden and will cause a 422.
if service.spec.type == "LoadBalancer":
await core.patch_namespaced_service(
name=f"crate-{name}",
namespace=namespace,
body={"spec": {"loadBalancerSourceRanges": change.new}},
)
else:
logger.info(
f"Skipping loadBalancerSourceRanges patch: service 'crate-{name}' "
f"is of type '{service.spec.type}', not 'LoadBalancer'."
)

ingress = await read_grand_central_ingress(namespace=namespace, name=name)

Expand Down
7 changes: 6 additions & 1 deletion crate/operator/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
get_cratedb_resource,
get_host,
get_system_user_password,
has_ingress_route_tcp,
)
from crate.operator.utils.notifications import send_operation_progress_notification
from crate.operator.webhooks import (
Expand Down Expand Up @@ -631,7 +632,11 @@ async def suspend_or_start_cluster(
await recreate_services(
namespace, name, cratedb["spec"], cratedb["metadata"], logger
)
if not await is_lb_service_ready(core, namespace, name):
use_traefik = await has_ingress_route_tcp(namespace)
logger.info(f"Traefik detected: {use_traefik}")
if not use_traefik and not await is_lb_service_ready(
core, namespace, name
):
raise TemporaryError(delay=config.BOOTSTRAP_RETRY_DELAY)

index_path, *_ = field_path
Expand Down
22 changes: 22 additions & 0 deletions crate/operator/utils/kubeapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,25 @@ async def get_gc_user_password(core: CoreV1Api, namespace: str, name: str) -> st
namespace,
{"key": "password", "name": GC_USER_SECRET_NAME.format(name=name)},
)


async def has_ingress_route_tcp(namespace: str) -> bool:
"""
Returns True if any IngressRouteTCP resource exists in the given namespace,
indicating the cluster uses Traefik instead of a cloud LoadBalancer.
"""
async with GlobalApiClient() as api_client:
coapi = CustomObjectsApi(api_client)
try:
result = await coapi.list_namespaced_custom_object(
group="traefik.io",
version="v1alpha1",
namespace=namespace,
plural="ingressroutetcps",
)
return len(result.get("items", [])) > 0
except ApiException as e:
if e.status == 404:
# CRD doesn't exist on this cluster
return False
raise
6 changes: 6 additions & 0 deletions deploy/charts/crate-operator/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ rules:
- patch
- update
- watch
- apiGroups:
- traefik.io
resources:
- ingressroutetcps
verbs:
- list

---
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
6 changes: 6 additions & 0 deletions deploy/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ rules:
- patch
- update
- watch
- apiGroups:
- traefik.io
resources:
- ingressroutetcps
verbs:
- list

---
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
Loading