Skip to content

Commit 12c9a38

Browse files
authored
Fix RuntimeError in ClusterPubSub sharded message generator (#3889)
* Fix RuntimeError in ClusterPubSub sharded message generator * Applying review comments
1 parent 9205321 commit 12c9a38

File tree

2 files changed

+116
-1
lines changed

2 files changed

+116
-1
lines changed

redis/cluster.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2208,7 +2208,8 @@ def _sharded_message_generator(self):
22082208

22092209
def _pubsubs_generator(self):
22102210
while True:
2211-
yield from self.node_pubsub_mapping.values()
2211+
current_nodes = list(self.node_pubsub_mapping.values())
2212+
yield from current_nodes
22122213

22132214
def get_sharded_message(
22142215
self, ignore_subscribe_messages=False, timeout=0.0, target_node=None

tests/test_pubsub.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -871,6 +871,120 @@ def test_pubsub_shardnumsub(self, r):
871871
channels = [(b"foo", 1), (b"bar", 2), (b"baz", 3)]
872872
assert r.pubsub_shardnumsub("foo", "bar", "baz", target_nodes="all") == channels
873873

874+
@pytest.mark.onlycluster
875+
@skip_if_server_version_lt("7.0.0")
876+
def test_ssubscribe_multiple_channels_different_nodes(self, r):
877+
"""
878+
Test subscribing to multiple sharded channels on different nodes.
879+
Validates that the generator properly handles multiple node_pubsub_mapping entries.
880+
"""
881+
pubsub = r.pubsub()
882+
channel1 = "test-channel:{0}"
883+
channel2 = "test-channel:{6}"
884+
885+
# Subscribe to first channel
886+
pubsub.ssubscribe(channel1)
887+
msg = wait_for_message(pubsub, timeout=1.0, func=pubsub.get_sharded_message)
888+
assert msg is not None
889+
assert msg["type"] == "ssubscribe"
890+
891+
# Subscribe to second channel (likely different node)
892+
pubsub.ssubscribe(channel2)
893+
msg = wait_for_message(pubsub, timeout=1.0, func=pubsub.get_sharded_message)
894+
assert msg is not None
895+
assert msg["type"] == "ssubscribe"
896+
897+
# Verify both channels are in shard_channels
898+
assert channel1.encode() in pubsub.shard_channels
899+
assert channel2.encode() in pubsub.shard_channels
900+
901+
pubsub.close()
902+
903+
@pytest.mark.onlycluster
904+
@skip_if_server_version_lt("7.0.0")
905+
def test_ssubscribe_multiple_channels_publish_and_read(self, r):
906+
"""
907+
Test publishing to multiple sharded channels and reading messages.
908+
Validates that _sharded_message_generator properly cycles through
909+
multiple node_pubsub_mapping entries.
910+
"""
911+
pubsub = r.pubsub()
912+
channel1 = "test-channel:{0}"
913+
channel2 = "test-channel:{6}"
914+
msg1_data = "message-1"
915+
msg2_data = "message-2"
916+
917+
# Subscribe to both channels
918+
pubsub.ssubscribe(channel1, channel2)
919+
920+
# Read subscription confirmations
921+
for _ in range(2):
922+
msg = wait_for_message(pubsub, timeout=1.0, func=pubsub.get_sharded_message)
923+
assert msg is not None
924+
assert msg["type"] == "ssubscribe"
925+
926+
# Publish messages to both channels
927+
r.spublish(channel1, msg1_data)
928+
r.spublish(channel2, msg2_data)
929+
930+
# Read messages - should get both messages
931+
messages = []
932+
for _ in range(2):
933+
msg = wait_for_message(pubsub, timeout=1.0, func=pubsub.get_sharded_message)
934+
assert msg is not None
935+
assert msg["type"] == "smessage"
936+
messages.append(msg)
937+
938+
# Verify we got messages from both channels
939+
channels_received = {msg["channel"] for msg in messages}
940+
assert channel1.encode() in channels_received
941+
assert channel2.encode() in channels_received
942+
943+
pubsub.close()
944+
945+
@pytest.mark.onlycluster
946+
@skip_if_server_version_lt("7.0.0")
947+
def test_generator_handles_concurrent_mapping_changes(self, r):
948+
"""
949+
Test that the generator properly handles mapping changes during iteration.
950+
This validates the fix for the RuntimeError: dictionary changed size during iteration.
951+
"""
952+
pubsub = r.pubsub()
953+
channel1 = "test-channel:{0}"
954+
channel2 = "test-channel:{6}"
955+
956+
# Subscribe to first channel
957+
pubsub.ssubscribe(channel1)
958+
msg = wait_for_message(pubsub, timeout=1.0, func=pubsub.get_sharded_message)
959+
assert msg is not None
960+
assert msg["type"] == "ssubscribe"
961+
962+
# Get initial mapping size (cluster pubsub only)
963+
assert hasattr(pubsub, "node_pubsub_mapping"), "Test requires ClusterPubSub"
964+
initial_size = len(pubsub.node_pubsub_mapping)
965+
966+
# Subscribe to second channel (modifies mapping during potential iteration)
967+
pubsub.ssubscribe(channel2)
968+
msg = wait_for_message(pubsub, timeout=1.0, func=pubsub.get_sharded_message)
969+
assert msg is not None
970+
assert msg["type"] == "ssubscribe"
971+
972+
# Verify mapping was updated
973+
assert len(pubsub.node_pubsub_mapping) >= initial_size
974+
975+
# Publish and read messages - should not raise RuntimeError
976+
r.spublish(channel1, "msg1")
977+
r.spublish(channel2, "msg2")
978+
979+
messages_received = 0
980+
for _ in range(2):
981+
msg = wait_for_message(pubsub, timeout=1.0, func=pubsub.get_sharded_message)
982+
if msg and msg["type"] == "smessage":
983+
messages_received += 1
984+
985+
assert messages_received == 2
986+
pubsub.close()
987+
874988

875989
class TestPubSubPings:
876990
@skip_if_server_version_lt("3.0.0")

0 commit comments

Comments
 (0)