Skip to content

Pulsar

Fixtures

pytest_streaming.pulsar.fixtures

Functions:

Name Description
streaming_pulsar_marker

Usable PulsarMarker object

streaming_pulsar_client

Raw pulsar client using the service url configured for the given test.

streaming_pulsar_consumer

Raw pulsar consumer using the topics configured for the given test. Yields a unique subscription name each time.

streaming_pulsar_producers

Raw pulsar producer using the topics configured for the given test.

streaming_pulsar_marker(request, pytestconfig)

Usable PulsarMarker object

Yields the base pulsar marker object that gives you access to the designated configurations for the individual test. See PulsarMarker specification.

Example
    from streaming.pulsar.markers import PulsarMarker

    @pytest.mark.pulsar(topics=["topic-a", "topic-b"])
    def test_pulsar_topics(streaming_pulsar_marker: PulsarMarker):
        assert PulsarMarker.topics == ["topic-a", "topic-b"]

Returns:

Name Type Description
PulsarMarker PulsarMarker

object with all of the defined user configurations

Source code in pytest_streaming/pulsar/fixtures.py
@pytest.fixture
def streaming_pulsar_marker(request: FixtureRequest, pytestconfig: Config) -> PulsarMarker:
    """Usable PulsarMarker object

    Yields the base pulsar marker object that gives you access to the designated
    configurations for the individual test. See PulsarMarker specification.

    Example:
        ```python
            from streaming.pulsar.markers import PulsarMarker

            @pytest.mark.pulsar(topics=["topic-a", "topic-b"])
            def test_pulsar_topics(streaming_pulsar_marker: PulsarMarker):
                assert PulsarMarker.topics == ["topic-a", "topic-b"]
        ```

    Returns:
        PulsarMarker: object with all of the defined user configurations

    """
    return PulsarMarker(config=pytestconfig, request=request)

streaming_pulsar_client(streaming_pulsar_marker)

Raw pulsar client using the service url configured for the given test.

Does all of the necessary cleanup for you after the test concludes.

Example
    from pulsar import Client

    @pytest.mark.pulsar(topics=["topic-a", "topic-b"])
    def test_pulsar_topics(streaming_pulsar_client: Client):
        assert isinstance(streaming_pulsar_client, Client)

Returns:

Type Description
Generator[Client, None]

pulsar.Client: raw pulsar client from the base pulsar library

Source code in pytest_streaming/pulsar/fixtures.py
@pytest.fixture
def streaming_pulsar_client(streaming_pulsar_marker: PulsarMarker) -> Generator[Client, None]:
    """Raw pulsar client using the service url configured for the given test.

    Does all of the necessary cleanup for you after the test concludes.

    Example:
        ```python
            from pulsar import Client

            @pytest.mark.pulsar(topics=["topic-a", "topic-b"])
            def test_pulsar_topics(streaming_pulsar_client: Client):
                assert isinstance(streaming_pulsar_client, Client)
        ```

    Returns:
        pulsar.Client: raw pulsar client from the base pulsar library
    """
    client = Client(service_url=streaming_pulsar_marker.service_url)
    try:
        yield client
    finally:
        client.close()
        del client

streaming_pulsar_consumer(streaming_pulsar_client, streaming_pulsar_marker)

Raw pulsar consumer using the topics configured for the given test. Yields a unique subscription name each time.

Does all of the necessary cleanup for you after the test concludes.

Example
    from pulsar import Consumer

    @pytest.mark.pulsar(topics=["topic-a", "topic-b"])
    def test_pulsar_topics(streaming_pulsar_consumer: Consumer):
        print(streaming_pulsar_consumer.subscription_name)
        msg = streaming_pulsar_consumer.receive()

Returns:

Type Description
Generator[Consumer, None]

pulsar.Consumer: raw pulsar consumer from the base pulsar library

Source code in pytest_streaming/pulsar/fixtures.py
@pytest.fixture
def streaming_pulsar_consumer(
    streaming_pulsar_client: Client, streaming_pulsar_marker: PulsarMarker
) -> Generator[Consumer, None]:
    """Raw pulsar consumer using the topics configured for the given test. Yields a unique subscription name each time.

    Does all of the necessary cleanup for you after the test concludes.

    Example:
        ```python
            from pulsar import Consumer

            @pytest.mark.pulsar(topics=["topic-a", "topic-b"])
            def test_pulsar_topics(streaming_pulsar_consumer: Consumer):
                print(streaming_pulsar_consumer.subscription_name)
                msg = streaming_pulsar_consumer.receive()
        ```

    Returns:
        pulsar.Consumer: raw pulsar consumer from the base pulsar library
    """
    consumer = streaming_pulsar_client.subscribe(
        topic=streaming_pulsar_marker.topics, subscription_name=str(uuid.uuid4())
    )
    try:
        yield consumer
    finally:
        consumer.close()
        del consumer

streaming_pulsar_producers(streaming_pulsar_client, streaming_pulsar_marker)

Raw pulsar producer using the topics configured for the given test.

Does all of the necessary cleanup for you after the test concludes.

Example
    from pulsar import Producer

    @pytest.mark.pulsar(topics=["topic-a", "topic-b"])
    def test_pulsar_topics(streaming_pulsar_producers: dict[str, Producer]):
        producer_a = streaming_pulsar_producers["topic-a"]
        producer_b = streaming_pulsar_producers["topic-b"]
        producer_a.send(...)
        producer_b.send(...)

Returns:

Type Description
Generator[dict[str, Producer], None]

dict[topic.name, pulsar.Producer]: raw pulsar producers from the base pulsar library

Source code in pytest_streaming/pulsar/fixtures.py
@pytest.fixture
def streaming_pulsar_producers(
    streaming_pulsar_client: Client, streaming_pulsar_marker: PulsarMarker
) -> Generator[dict[str, Producer], None]:
    """Raw pulsar producer using the topics configured for the given test.

    Does all of the necessary cleanup for you after the test concludes.

    Example:
        ```python
            from pulsar import Producer

            @pytest.mark.pulsar(topics=["topic-a", "topic-b"])
            def test_pulsar_topics(streaming_pulsar_producers: dict[str, Producer]):
                producer_a = streaming_pulsar_producers["topic-a"]
                producer_b = streaming_pulsar_producers["topic-b"]
                producer_a.send(...)
                producer_b.send(...)
        ```

    Returns:
        dict[topic.name, pulsar.Producer]: raw pulsar producers from the base pulsar library
    """

    # TODO: update to property w/ support for dynamic tenant/namespace
    topic_objs = [
        TopicMeta(
            topic_name=topic, tenant=streaming_pulsar_marker._tenant, namespace=streaming_pulsar_marker._namespace
        )
        for topic in streaming_pulsar_marker.topics
    ]

    producers = {topic_obj.short: streaming_pulsar_client.create_producer(topic_obj.long) for topic_obj in topic_objs}

    try:
        yield producers
    finally:
        for _, producer in producers.items():
            producer.close()
            del producer

 

Markers

pytest_streaming.pulsar.markers.PulsarMarker

Bases: BaseMarker

Primary pulsar marker for working with Pulsar topics.

This marker allows you to create and delete Pulsar topics for testing purposes. It ensures the specified tenant and namespace exist before creating topics. By default, topics are recreated if they already exist.

Attributes:

Name Type Description
- marker_name (str

name of the marker

- marker_description (str

description of the marker

- topics (list[str]

A list of Pulsar topic names to create.

- delete_after (bool

If True, the topics will be deleted after the test. (default: False)

- service_url (str

The Pulsar service URL. (default: from pytest.ini or Defaults.PULSAR_SERVICE_URL)

- admin_url (str

The pulsar admin URL (default: from pytest.ini or Defaults.PULSAR_ADMIN_URL)

Required Parameters
  • topics (list[str])
Optional Parameters
  • delete_after (bool)
  • service_url (str)
  • admin_url (str)
Example
@pytest.mark.pulsar(topics=["topic-a", "topic-b"])
def test_pulsar_topics():
    # Your test code here
    pass
Source code in pytest_streaming/pulsar/markers.py
class PulsarMarker(BaseMarker):
    """Primary pulsar marker for working with Pulsar topics.

    This marker allows you to create and delete Pulsar topics for testing purposes.
    It ensures the specified tenant and namespace exist before creating topics.
    By default, topics are recreated if they already exist.

    Attributes:
        - marker_name (str): name of the marker
        - marker_description (str): description of the marker
        - topics (list[str]): A list of Pulsar topic names to create.
        - delete_after (bool): If True, the topics will be deleted after the test. (default: False)
        - service_url (str): The Pulsar service URL. (default: from pytest.ini or Defaults.PULSAR_SERVICE_URL)
        - admin_url (str): The pulsar admin URL (default: from pytest.ini or Defaults.PULSAR_ADMIN_URL)

    Required Parameters:
        - topics (list[str])

    Optional Parameters:
        - delete_after (bool)
        - service_url (str)
        - admin_url (str)

    Example:
        ```python
        @pytest.mark.pulsar(topics=["topic-a", "topic-b"])
        def test_pulsar_topics():
            # Your test code here
            pass
        ```
    """

    marker_name: str = "pulsar"
    marker_description: str = "Create specified Pulsar topics automatically for the test."
    marker_params: list[str] = [param.value for param in PulsarMarkerParams]

    # Default values for the marker parameters
    _tenant: str = Defaults.PULSAR_TENANT.value
    _namespace: str = Defaults.PULSAR_NAMESPACE.value

    @property
    def topics(self) -> list[str]:
        if not self.marker:
            raise ValueError("Marker (pulsar) is not set")  # pragma: no cover

        topics = self.marker.kwargs.get(PulsarMarkerParams.TOPICS.root())
        if not topics or not isinstance(topics, list) or not all(isinstance(topic, str) for topic in topics):
            raise ValueError("No topics specified or invalid specification (list[str]) for the pulsar marker")
        return cast(list[str], topics)

    @property
    def delete_after(self) -> bool:
        if not self.marker:
            raise ValueError("Marker (pulsar) is not set")  # pragma: no cover

        delete_after = self.marker.kwargs.get(PulsarMarkerParams.DELETE_AFTER.root(), Defaults.PULSAR_AUTO_DELETE.value)
        if not isinstance(delete_after, bool):
            raise ValueError("Invalid specification for delete_after (bool)")  # pragma: no cover
        return delete_after

    @property
    def service_url(self) -> str:
        if not self.marker:
            raise ValueError("Marker (pulsar) is not set")  # pragma: no cover

        override_url = self.marker.kwargs.get(PulsarMarkerParams.SERVICE_URL.root())
        service_url = override_url or self.config.getini(Configuration.PULSAR_SERVICE_URL)
        if not isinstance(service_url, str):
            raise ValueError("Invalid specification for service_url (str)")  # pragma: no cover
        return service_url

    @property
    def admin_url(self) -> str:
        if not self.marker:
            raise ValueError("Marker (pulsar) is not set")  # pragma: no cover

        override_url = self.marker.kwargs.get(PulsarMarkerParams.ADMIN_URL.root())
        admin_url = override_url or self.config.getini(Configuration.PULSAR_ADMIN_URL)
        if not isinstance(admin_url, str):
            raise ValueError("Invalid specification for admin_url (str)")  # pragma: no cover
        return admin_url

    @property
    def _topic_meta(self) -> list[TopicMeta]:
        return [TopicMeta(topic_name=topic, tenant=self._tenant, namespace=self._namespace) for topic in self.topics]

    @cached_property
    def _pulsar_client(self) -> PulsarClientWrapper:
        return PulsarClientWrapper(service_url=self.service_url, admin_url=self.admin_url)

    @contextmanager
    def impl(self) -> Generator[None, None, None]:
        if not self.marker:
            yield
            return

        try:
            self._pulsar_client.setup_testing_topics(topics=self._topic_meta)

            yield

            if self.delete_after:
                self._pulsar_client.delete_testing_topics(topics=self._topic_meta)
        finally:
            self._pulsar_client.close()