Skip to content

Cluster Batch

Home / python / shared / cluster_batch

glide_shared.commands.batch.ClusterBatch

Bases: BaseBatch

Batch implementation for cluster GlideClusterClient. Batches allow the execution of a group of commands in a single step.

Batch Response

An array of command responses is returned by the client exec command, in the order they were given. Each element in the array represents a command given to the ClusterBatch. The response for each command depends on the executed Valkey command. Specific response types are documented alongside each method.

Parameters:

Name Type Description Default
is_atomic bool

Determines whether the batch is atomic or non-atomic. If True, the batch will be executed as an atomic transaction. If False, the batch will be executed as a non-atomic pipeline.

required

See Valkey Transactions (Atomic Batches) and Valkey Pipelines (Non-Atomic Batches) for details.

Examples:

Atomic Batch - Transaction in a Cluster:
>>> transaction = ClusterBatch(is_atomic=True)  # Atomic (Transaction)
>>> transaction.set("key", "value")
>>> transaction.get("key")
>>> result = await client.exec(transaction, false)
>>> print(result)
[OK, b"value"]
Non-Atomic Batch - Pipeline in a Cluster:
>>> pipeline = ClusterBatch(is_atomic=False)  # Non-Atomic (Pipeline)
>>> pipeline.set("key1", "value1")
>>> pipeline.set("key2", "value2")
>>> pipeline.get("key1")
>>> pipeline.get("key2")
>>> result = await client.exec(pipeline, false)
>>> print(result)
[OK, OK, b"value1", b"value2"]
Source code in glide_shared/commands/batch.py
5843
5844
5845
5846
5847
5848
5849
5850
5851
5852
5853
5854
5855
5856
5857
5858
5859
5860
5861
5862
5863
5864
5865
5866
5867
5868
5869
5870
5871
5872
5873
5874
5875
5876
5877
5878
5879
5880
5881
5882
5883
5884
5885
5886
5887
5888
5889
5890
5891
5892
5893
5894
5895
5896
5897
5898
5899
5900
5901
5902
5903
5904
5905
5906
5907
5908
5909
5910
5911
5912
5913
5914
5915
5916
5917
5918
5919
5920
5921
5922
5923
5924
5925
5926
5927
5928
5929
5930
5931
5932
5933
5934
5935
5936
5937
5938
5939
5940
5941
5942
5943
5944
5945
5946
5947
5948
5949
5950
5951
5952
5953
5954
5955
5956
5957
5958
5959
5960
5961
5962
5963
5964
5965
5966
5967
5968
5969
5970
5971
5972
5973
5974
5975
class ClusterBatch(BaseBatch):
    """
    Batch implementation for cluster GlideClusterClient. Batches allow the execution of a group
    of commands in a single step.

    Batch Response:
        An ``array`` of command responses is returned by the client ``exec`` command,
        in the order they were given. Each element in the array represents a command given to the ClusterBatch.
        The response for each command depends on the executed Valkey command.
        Specific response types are documented alongside each method.

    Args:
        is_atomic (bool): Determines whether the batch is atomic or non-atomic. If ``True``,
            the batch will be executed as an atomic transaction. If ``False``,
            the batch will be executed as a non-atomic pipeline.

    See [Valkey Transactions (Atomic Batches)](https://valkey.io/topics/transactions/) and [Valkey Pipelines (Non-Atomic Batches)](https://valkey.io/topics/pipelining/) for details.

    Examples:
        ### Atomic Batch - Transaction in a Cluster:
        >>> transaction = ClusterBatch(is_atomic=True)  # Atomic (Transaction)
        >>> transaction.set("key", "value")
        >>> transaction.get("key")
        >>> result = await client.exec(transaction, false)
        >>> print(result)
        [OK, b"value"]

        ### Non-Atomic Batch - Pipeline in a Cluster:
        >>> pipeline = ClusterBatch(is_atomic=False)  # Non-Atomic (Pipeline)
        >>> pipeline.set("key1", "value1")
        >>> pipeline.set("key2", "value2")
        >>> pipeline.get("key1")
        >>> pipeline.get("key2")
        >>> result = await client.exec(pipeline, false)
        >>> print(result)
        [OK, OK, b"value1", b"value2"]

    """

    def copy(
        self,
        source: TEncodable,
        destination: TEncodable,
        replace: Optional[bool] = None,
    ) -> "ClusterBatch":
        """
        Copies the value stored at the `source` to the `destination` key. When `replace` is True,
        removes the `destination` key first if it already exists, otherwise performs no action.

        See [valkey.io](https://valkey.io/commands/copy) for more details.

        Args:
            source (TEncodable): The key to the source value.
            destination (TEncodable): The key where the value should be copied to.
            replace (Optional[bool]): If the destination key should be removed before copying the value to it.

        Command response:
            bool: True if the source was copied.

            Otherwise, return False.

        Since: Valkey version 6.2.0.
        """
        args = [source, destination]
        if replace is not None:
            args.append("REPLACE")

        return self.append_command(RequestType.Copy, args)

    def publish(
        self, message: str, channel: str, sharded: bool = False
    ) -> "ClusterBatch":
        """
        Publish a message on pubsub channel.
        This command aggregates PUBLISH and SPUBLISH commands functionalities.
        The mode is selected using the 'sharded' parameter

        See [PUBLISH](https://valkey.io/commands/publish) and [SPUBLISH](https://valkey.io/commands/spublish)
        for more details.

        Args:
            message (str): Message to publish
            channel (str): Channel to publish the message on.
            sharded (bool): Use sharded pubsub mode. Available since Valkey version 7.0.

        Returns:
            int: Number of subscriptions in that shard that received the message.
        """
        return self.append_command(
            RequestType.SPublish if sharded else RequestType.Publish, [channel, message]
        )

    def pubsub_shardchannels(
        self, pattern: Optional[TEncodable] = None
    ) -> "ClusterBatch":
        """
        Lists the currently active shard channels.

        See [valkey.io](https://valkey.io/commands/pubsub-shardchannels) for details.

        Args:
            pattern (Optional[TEncodable]): A glob-style pattern to match active shard channels.
                If not provided, all active shard channels are returned.

        Command response:
            List[bytes]: A list of currently active shard channels matching the given pattern.

            If no pattern is specified, all active shard channels are returned.
        """
        command_args = [pattern] if pattern is not None else []
        return self.append_command(RequestType.PubSubShardChannels, command_args)

    def pubsub_shardnumsub(
        self, channels: Optional[List[TEncodable]] = None
    ) -> "ClusterBatch":
        """
        Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified shard channels.

        Note:
            It is valid to call this command without channels. In this case, it will just return an empty map.

        See [valkey.io](https://valkey.io/commands/pubsub-shardnumsub) for details.

        Args:
            channels (Optional[List[str]]): The list of shard channels to query for the number of subscribers.
                If not provided, returns an empty map.

        Command response:
            Mapping[bytes, int]: A map where keys are the shard channel names and values are the number of subscribers.
        """
        return self.append_command(
            RequestType.PubSubShardNumSub, channels if channels else []
        )

copy(source, destination, replace=None)

Copies the value stored at the source to the destination key. When replace is True, removes the destination key first if it already exists, otherwise performs no action.

See valkey.io for more details.

Parameters:

Name Type Description Default
source TEncodable

The key to the source value.

required
destination TEncodable

The key where the value should be copied to.

required
replace Optional[bool]

If the destination key should be removed before copying the value to it.

None
Command response

bool: True if the source was copied.

Otherwise, return False.

Since: Valkey version 6.2.0.

Source code in glide_shared/commands/batch.py
5882
5883
5884
5885
5886
5887
5888
5889
5890
5891
5892
5893
5894
5895
5896
5897
5898
5899
5900
5901
5902
5903
5904
5905
5906
5907
5908
5909
5910
def copy(
    self,
    source: TEncodable,
    destination: TEncodable,
    replace: Optional[bool] = None,
) -> "ClusterBatch":
    """
    Copies the value stored at the `source` to the `destination` key. When `replace` is True,
    removes the `destination` key first if it already exists, otherwise performs no action.

    See [valkey.io](https://valkey.io/commands/copy) for more details.

    Args:
        source (TEncodable): The key to the source value.
        destination (TEncodable): The key where the value should be copied to.
        replace (Optional[bool]): If the destination key should be removed before copying the value to it.

    Command response:
        bool: True if the source was copied.

        Otherwise, return False.

    Since: Valkey version 6.2.0.
    """
    args = [source, destination]
    if replace is not None:
        args.append("REPLACE")

    return self.append_command(RequestType.Copy, args)

publish(message, channel, sharded=False)

Publish a message on pubsub channel. This command aggregates PUBLISH and SPUBLISH commands functionalities. The mode is selected using the 'sharded' parameter

See PUBLISH and SPUBLISH for more details.

Parameters:

Name Type Description Default
message str

Message to publish

required
channel str

Channel to publish the message on.

required
sharded bool

Use sharded pubsub mode. Available since Valkey version 7.0.

False

Returns:

Name Type Description
int ClusterBatch

Number of subscriptions in that shard that received the message.

Source code in glide_shared/commands/batch.py
5912
5913
5914
5915
5916
5917
5918
5919
5920
5921
5922
5923
5924
5925
5926
5927
5928
5929
5930
5931
5932
5933
def publish(
    self, message: str, channel: str, sharded: bool = False
) -> "ClusterBatch":
    """
    Publish a message on pubsub channel.
    This command aggregates PUBLISH and SPUBLISH commands functionalities.
    The mode is selected using the 'sharded' parameter

    See [PUBLISH](https://valkey.io/commands/publish) and [SPUBLISH](https://valkey.io/commands/spublish)
    for more details.

    Args:
        message (str): Message to publish
        channel (str): Channel to publish the message on.
        sharded (bool): Use sharded pubsub mode. Available since Valkey version 7.0.

    Returns:
        int: Number of subscriptions in that shard that received the message.
    """
    return self.append_command(
        RequestType.SPublish if sharded else RequestType.Publish, [channel, message]
    )

pubsub_shardchannels(pattern=None)

Lists the currently active shard channels.

See valkey.io for details.

Parameters:

Name Type Description Default
pattern Optional[TEncodable]

A glob-style pattern to match active shard channels. If not provided, all active shard channels are returned.

None
Command response

List[bytes]: A list of currently active shard channels matching the given pattern.

If no pattern is specified, all active shard channels are returned.

Source code in glide_shared/commands/batch.py
5935
5936
5937
5938
5939
5940
5941
5942
5943
5944
5945
5946
5947
5948
5949
5950
5951
5952
5953
def pubsub_shardchannels(
    self, pattern: Optional[TEncodable] = None
) -> "ClusterBatch":
    """
    Lists the currently active shard channels.

    See [valkey.io](https://valkey.io/commands/pubsub-shardchannels) for details.

    Args:
        pattern (Optional[TEncodable]): A glob-style pattern to match active shard channels.
            If not provided, all active shard channels are returned.

    Command response:
        List[bytes]: A list of currently active shard channels matching the given pattern.

        If no pattern is specified, all active shard channels are returned.
    """
    command_args = [pattern] if pattern is not None else []
    return self.append_command(RequestType.PubSubShardChannels, command_args)

pubsub_shardnumsub(channels=None)

Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified shard channels.

Note

It is valid to call this command without channels. In this case, it will just return an empty map.

See valkey.io for details.

Parameters:

Name Type Description Default
channels Optional[List[str]]

The list of shard channels to query for the number of subscribers. If not provided, returns an empty map.

None
Command response

Mapping[bytes, int]: A map where keys are the shard channel names and values are the number of subscribers.

Source code in glide_shared/commands/batch.py
5955
5956
5957
5958
5959
5960
5961
5962
5963
5964
5965
5966
5967
5968
5969
5970
5971
5972
5973
5974
5975
def pubsub_shardnumsub(
    self, channels: Optional[List[TEncodable]] = None
) -> "ClusterBatch":
    """
    Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified shard channels.

    Note:
        It is valid to call this command without channels. In this case, it will just return an empty map.

    See [valkey.io](https://valkey.io/commands/pubsub-shardnumsub) for details.

    Args:
        channels (Optional[List[str]]): The list of shard channels to query for the number of subscribers.
            If not provided, returns an empty map.

    Command response:
        Mapping[bytes, int]: A map where keys are the shard channel names and values are the number of subscribers.
    """
    return self.append_command(
        RequestType.PubSubShardNumSub, channels if channels else []
    )