Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: no forwarding if primary url own #1038

Merged
merged 1 commit into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions src/karapace/coordinator/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from karapace.config import Config
from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus
from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS
from karapace.typing import SchemaReaderStoppper
from karapace.typing import PrimaryInfo, SchemaReaderStoppper
from threading import Thread
from typing import Final

Expand Down Expand Up @@ -157,18 +157,26 @@ def get_coordinator_status(self) -> SchemaCoordinatorStatus:
group_generation_id=generation if generation is not None else -1,
)

def get_master_info(self) -> tuple[bool | None, str | None]:
def get_master_info(self) -> PrimaryInfo:
"""Return whether we're the master, and the actual master url that can be used if we're not"""
if not self._sc:
return False, None
return PrimaryInfo(False, None)

if not self._sc.ready():
# we should wait for a while after we have been elected master, we should also consume
# all the messages in the log before proceeding, check the doc of `self._sc.are_we_master`
# for more details
return False, None
return PrimaryInfo(False, None)

return self._sc.are_we_master(), self._sc.master_url
url: str | None = None
if (
self._sc.master_url is not None
and f"{self.config['host']}:{self.config['port']}" not in self._sc.master_url
and f"{self.config['advertised_hostname']}:{self.config['advertised_port']}" not in self._sc.master_url
):
url = self._sc.master_url

return PrimaryInfo(self._sc.are_we_master(), url)

def __send_close_event(self) -> None:
self._closing.set()
Expand Down
5 changes: 3 additions & 2 deletions src/karapace/coordinator/schema_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def __init__(
def is_master_assigned_to_myself(self) -> bool:
return self._are_we_master or False

def are_we_master(self) -> bool | None:
def are_we_master(self) -> bool:
"""
After a new election its made we should wait for a while since the previous master could have produced
a new message shortly before being disconnected from the cluster.
Expand All @@ -211,7 +211,7 @@ def are_we_master(self) -> bool | None:
# `self._are_we_master` is `None` only during the perform of the assignment
# where we don't know if we are master yet
LOG.warning("No new elections performed yet.")
return None
return False

if not self._ready or not self._schema_reader_stopper.ready():
return False
Expand Down Expand Up @@ -522,6 +522,7 @@ def coordinator_dead(self) -> None:
self._are_we_master = False
self.coordinator_id = None
self._coordinator_dead_fut.set_result(None)
self.request_rejoin()

def reset_generation(self) -> None:
"""Coordinator did not recognize either generation or member_id. Will
Expand Down
4 changes: 2 additions & 2 deletions src/karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,12 +393,12 @@ def handle_messages(self) -> None:

watch_offsets = False
if self.master_coordinator is not None:
are_we_master, _ = self.master_coordinator.get_master_info()
primary_info = self.master_coordinator.get_master_info()
# keep old behavior for True. When are_we_master is False, then we are a follower, so we should not accept direct
# writes anyway. When are_we_master is None, then this particular node is waiting for a stable value, so any
# messages off the topic are writes performed by another node
# Also if master_eligibility is disabled by configuration, disable writes too
if are_we_master is True:
if primary_info.primary:
watch_offsets = True

self.consume_messages(msgs, watch_offsets)
Expand Down
28 changes: 14 additions & 14 deletions src/karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Versioner
from karapace.schema_reader import KafkaSchemaReader
from karapace.schema_references import LatestVersionReference, Reference
from karapace.typing import JsonObject, Mode, SchemaId, Subject, Version
from karapace.typing import JsonObject, Mode, PrimaryInfo, SchemaId, Subject, Version

import asyncio
import logging
Expand Down Expand Up @@ -85,23 +85,23 @@ async def close(self) -> None:
stack.enter_context(closing(self.schema_reader))
stack.enter_context(closing(self.producer))

async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str | None]:
async def get_master(self) -> PrimaryInfo:
"""Resolve if current node is the primary and the primary node address.

:param bool ignore_readiness: Ignore waiting to become ready and return
follower/primary state and primary url.
:return (bool, Optional[str]): returns the primary/follower state and primary url
:return PrimaryInfo: returns the PrimaryInfo object with primary state and primary url.
"""
async with self._master_lock:
while True:
are_we_master, master_url = self.mc.get_master_info()
if are_we_master is None:
LOG.info("No master set: %r, url: %r", are_we_master, master_url)
elif not ignore_readiness and self.schema_reader.ready() is False:
LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready)
else:
return are_we_master, master_url
await asyncio.sleep(1.0)
primary_info = self.mc.get_master_info()
if (
# If node is not primary and no known primary url
not primary_info.primary
and primary_info.primary_url is None
):
LOG.warning("No master set: %r", primary_info)
if self.schema_reader.ready() is False:
LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready)
return PrimaryInfo(False, primary_url=primary_info.primary_url)
return primary_info

def get_compatibility_mode(self, subject: Subject) -> CompatibilityModes:
compatibility = self.database.get_subject_compatibility(subject=subject)
Expand Down
79 changes: 39 additions & 40 deletions src/karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,20 +145,12 @@ async def _forward_if_not_ready_to_serve(self, request: HTTPRequest, content_typ
pass
else:
# Not ready, still loading the state.
# Needs only the master_url
_, master_url = await self.schema_registry.get_master(ignore_readiness=True)
primary_info = await self.schema_registry.get_master()
returned_content_type = request.get_header("Content-Type") if content_type is None else content_type
if not master_url:
if not primary_info.primary_url:
self.no_master_error(request.content_type)
elif f"{self.config['advertised_hostname']}:{self.config['advertised_port']}" in master_url:
# If master url is the same as the url of this Karapace respond 503.
self.r(
body="",
content_type=returned_content_type,
status=HTTPStatus.SERVICE_UNAVAILABLE,
)
else:
url = f"{master_url}{request.url.path}"
url = f"{primary_info.primary_url}{request.url.path}"
await self._forward_request_remote(
request=request,
body=request.json,
Expand Down Expand Up @@ -574,13 +566,13 @@ async def config_set(self, content_type: str, *, request: HTTPRequest, user: Use
status=HTTPStatus.UNPROCESSABLE_ENTITY,
)

are_we_master, master_url = await self.schema_registry.get_master()
if are_we_master:
primary_info = await self.schema_registry.get_master()
if primary_info.primary:
self.schema_registry.send_config_message(compatibility_level=compatibility_level, subject=None)
elif not master_url:
elif not primary_info.primary_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/config"
url = f"{primary_info.primary_url}/config"
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="PUT")

self.r({"compatibility": self.schema_registry.schema_reader.config["compatibility"]}, content_type)
Expand Down Expand Up @@ -645,13 +637,13 @@ async def config_subject_set(
status=HTTPStatus.UNPROCESSABLE_ENTITY,
)

are_we_master, master_url = await self.schema_registry.get_master()
if are_we_master:
primary_info = await self.schema_registry.get_master()
if primary_info.primary:
self.schema_registry.send_config_message(compatibility_level=compatibility_level, subject=subject)
elif not master_url:
elif not primary_info.primary_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/config/{subject}"
url = f"{primary_info.primary_url}/config/{subject}"
await self._forward_request_remote(
request=request, body=request.json, url=url, content_type=content_type, method="PUT"
)
Expand All @@ -670,13 +662,13 @@ async def config_subject_delete(
if not self._auth.check_authorization(user, Operation.Write, f"Subject:{subject}"):
self.r(body={"message": "Forbidden"}, content_type=JSON_CONTENT_TYPE, status=HTTPStatus.FORBIDDEN)

are_we_master, master_url = await self.schema_registry.get_master()
if are_we_master:
primary_info = await self.schema_registry.get_master()
if primary_info.primary:
self.schema_registry.send_config_subject_delete_message(subject=subject)
elif not master_url:
elif not primary_info.primary_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/config/{subject}"
url = f"{primary_info.primary_url}/config/{subject}"
await self._forward_request_remote(
request=request, body=request.json, url=url, content_type=content_type, method="PUT"
)
Expand All @@ -685,21 +677,24 @@ async def config_subject_delete(

async def master_available(self, *, request: HTTPRequest) -> None:
no_cache_header = {"Cache-Control": "no-store, no-cache, must-revalidate"}
are_we_master, master_url = await self.schema_registry.get_master()
self.log.info("are master %s, master url %s", are_we_master, master_url)
primary_info = await self.schema_registry.get_master()
self.log.info("are master %s, master url %s", primary_info.primary, primary_info.primary_url)

if (
self.schema_registry.schema_reader.master_coordinator._sc is not None # pylint: disable=protected-access
and self.schema_registry.schema_reader.master_coordinator._sc.is_master_assigned_to_myself() # pylint: disable=protected-access
):
raise HTTPResponse(
body={"master_available": are_we_master},
body={"master_available": primary_info.primary},
status=HTTPStatus.OK,
content_type=JSON_CONTENT_TYPE,
headers=no_cache_header,
)

if master_url is None or f"{self.config['advertised_hostname']}:{self.config['advertised_port']}" in master_url:
if (
primary_info.primary_url is None
or f"{self.config['advertised_hostname']}:{self.config['advertised_port']}" in primary_info.primary_url
):
raise HTTPResponse(
body={"master_available": False},
status=HTTPStatus.OK,
Expand All @@ -708,7 +703,11 @@ async def master_available(self, *, request: HTTPRequest) -> None:
)

await self._forward_request_remote(
request=request, body={}, url=f"{master_url}/master_available", content_type=JSON_CONTENT_TYPE, method="GET"
request=request,
body={},
url=f"{primary_info.primary_url}/master_available",
content_type=JSON_CONTENT_TYPE,
method="GET",
)

async def subjects_list(self, content_type: str, *, request: HTTPRequest, user: User | None = None) -> None:
Expand All @@ -730,8 +729,8 @@ async def subject_delete(

permanent = request.query.get("permanent", "false").lower() == "true"

are_we_master, master_url = await self.schema_registry.get_master()
if are_we_master:
primary_info = await self.schema_registry.get_master()
if primary_info.primary:
try:
version_list = await self.schema_registry.subject_delete_local(subject=subject, permanent=permanent)
self.r([version.value for version in version_list], content_type, status=HTTPStatus.OK)
Expand Down Expand Up @@ -775,10 +774,10 @@ async def subject_delete(
content_type=content_type,
status=HTTPStatus.UNPROCESSABLE_ENTITY,
)
elif not master_url:
elif not primary_info.primary_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}?permanent={permanent}"
url = f"{primary_info.primary_url}/subjects/{subject}?permanent={permanent}"
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE")

async def subject_version_get(
Expand Down Expand Up @@ -819,8 +818,8 @@ async def subject_version_delete(
self._check_authorization(user, Operation.Write, f"Subject:{subject}")
permanent = request.query.get("permanent", "false").lower() == "true"

are_we_master, master_url = await self.schema_registry.get_master()
if are_we_master:
primary_info = await self.schema_registry.get_master()
if primary_info.primary:
try:
resolved_version = await self.schema_registry.subject_version_delete_local(
subject, Versioner.V(version), permanent
Expand Down Expand Up @@ -882,10 +881,10 @@ async def subject_version_delete(
)
except InvalidVersion:
self._invalid_version(content_type, version)
elif not master_url:
elif not primary_info.primary_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}/versions/{version}?permanent={permanent}"
url = f"{primary_info.primary_url}/subjects/{subject}/versions/{version}?permanent={permanent}"
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE")

async def subject_version_schema_get(
Expand Down Expand Up @@ -1241,8 +1240,8 @@ async def subject_post(
if schema_id is not None:
self.r({"id": schema_id}, content_type)

are_we_master, master_url = await self.schema_registry.get_master()
if are_we_master:
primary_info = await self.schema_registry.get_master()
if primary_info.primary:
try:
schema_id = await self.schema_registry.write_new_schema_local(subject, new_schema, references)
self.r(
Expand Down Expand Up @@ -1279,10 +1278,10 @@ async def subject_post(
except Exception as xx:
raise xx

elif not master_url:
elif not primary_info.primary_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}/versions"
url = f"{primary_info.primary_url}/subjects/{subject}/versions"
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="POST")

async def get_global_mode(
Expand Down
7 changes: 7 additions & 0 deletions src/karapace/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from abc import ABC, abstractmethod
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from enum import Enum, unique
from karapace.errors import InvalidVersion
from typing import Any, ClassVar, NewType, Union
Expand Down Expand Up @@ -113,3 +114,9 @@ def ready(self) -> bool:
@abstractmethod
def set_not_ready(self) -> None:
pass


@dataclass(frozen=True)
class PrimaryInfo:
primary: bool
primary_url: str | None
Loading
Loading