212 lines
8.0 KiB
Python
212 lines
8.0 KiB
Python
"""Unit tests for cluster_sr_zones() in app.services.sr_service."""
|
|
|
|
from app.services.sr_service import cluster_sr_zones
|
|
|
|
|
|
def _level(price: float, strength: int = 50) -> dict:
|
|
"""Helper to build a level dict."""
|
|
return {"price_level": price, "strength": strength}
|
|
|
|
|
|
class TestClusterSrZonesEmptyAndEdge:
|
|
"""Edge cases: empty input, max_zones boundaries."""
|
|
|
|
def test_empty_levels_returns_empty(self):
|
|
assert cluster_sr_zones([], current_price=100.0) == []
|
|
|
|
def test_max_zones_zero_returns_empty(self):
|
|
levels = [_level(100.0)]
|
|
assert cluster_sr_zones(levels, current_price=100.0, max_zones=0) == []
|
|
|
|
def test_max_zones_negative_returns_empty(self):
|
|
levels = [_level(100.0)]
|
|
assert cluster_sr_zones(levels, current_price=100.0, max_zones=-1) == []
|
|
|
|
def test_single_level(self):
|
|
levels = [_level(95.0, 60)]
|
|
zones = cluster_sr_zones(levels, current_price=100.0)
|
|
assert len(zones) == 1
|
|
z = zones[0]
|
|
assert z["low"] == 95.0
|
|
assert z["high"] == 95.0
|
|
assert z["midpoint"] == 95.0
|
|
assert z["strength"] == 60
|
|
assert z["type"] == "support"
|
|
assert z["level_count"] == 1
|
|
|
|
|
|
class TestClusterSrZonesMerging:
|
|
"""Greedy merge behaviour."""
|
|
|
|
def test_two_levels_within_tolerance_merge(self):
|
|
# 100 and 101 are 1% apart; tolerance=2% → should merge
|
|
levels = [_level(100.0, 30), _level(101.0, 40)]
|
|
zones = cluster_sr_zones(levels, current_price=200.0, tolerance=0.02)
|
|
assert len(zones) == 1
|
|
z = zones[0]
|
|
assert z["low"] == 100.0
|
|
assert z["high"] == 101.0
|
|
assert z["midpoint"] == 100.5
|
|
assert z["strength"] == 70
|
|
assert z["level_count"] == 2
|
|
|
|
def test_two_levels_outside_tolerance_stay_separate(self):
|
|
# 100 and 110 are 10% apart; tolerance=2% → separate
|
|
levels = [_level(100.0, 30), _level(110.0, 40)]
|
|
zones = cluster_sr_zones(levels, current_price=200.0, tolerance=0.02)
|
|
assert len(zones) == 2
|
|
|
|
def test_all_same_price_merge_into_one(self):
|
|
levels = [_level(50.0, 20), _level(50.0, 30), _level(50.0, 10)]
|
|
zones = cluster_sr_zones(levels, current_price=100.0)
|
|
assert len(zones) == 1
|
|
assert zones[0]["strength"] == 60
|
|
assert zones[0]["level_count"] == 3
|
|
|
|
def test_levels_at_tolerance_boundary(self):
|
|
# midpoint of cluster starting at 100 is 100. 2% of 100 = 2.
|
|
# A level at 102 is exactly at the boundary → should merge
|
|
levels = [_level(100.0, 25), _level(102.0, 25)]
|
|
zones = cluster_sr_zones(levels, current_price=200.0, tolerance=0.02)
|
|
assert len(zones) == 1
|
|
|
|
|
|
class TestClusterSrZonesStrength:
|
|
"""Strength capping and computation."""
|
|
|
|
def test_strength_capped_at_100(self):
|
|
levels = [_level(100.0, 80), _level(100.5, 80)]
|
|
zones = cluster_sr_zones(levels, current_price=200.0, tolerance=0.02)
|
|
assert len(zones) == 1
|
|
assert zones[0]["strength"] == 100
|
|
|
|
def test_strength_sum_when_under_cap(self):
|
|
levels = [_level(100.0, 10), _level(100.5, 20)]
|
|
zones = cluster_sr_zones(levels, current_price=200.0, tolerance=0.02)
|
|
assert zones[0]["strength"] == 30
|
|
|
|
|
|
class TestClusterSrZonesTypeTagging:
|
|
"""Support vs resistance tagging."""
|
|
|
|
def test_support_when_midpoint_below_current(self):
|
|
levels = [_level(90.0, 50)]
|
|
zones = cluster_sr_zones(levels, current_price=100.0)
|
|
assert zones[0]["type"] == "support"
|
|
|
|
def test_resistance_when_midpoint_above_current(self):
|
|
levels = [_level(110.0, 50)]
|
|
zones = cluster_sr_zones(levels, current_price=100.0)
|
|
assert zones[0]["type"] == "resistance"
|
|
|
|
def test_resistance_when_midpoint_equals_current(self):
|
|
# "else resistance" per spec
|
|
levels = [_level(100.0, 50)]
|
|
zones = cluster_sr_zones(levels, current_price=100.0)
|
|
assert zones[0]["type"] == "resistance"
|
|
|
|
|
|
class TestClusterSrZonesSorting:
|
|
"""Sorting by strength descending."""
|
|
|
|
def test_sorted_by_strength_descending(self):
|
|
levels = [_level(50.0, 20), _level(150.0, 80), _level(250.0, 50)]
|
|
zones = cluster_sr_zones(levels, current_price=100.0, tolerance=0.001)
|
|
strengths = [z["strength"] for z in zones]
|
|
assert strengths == sorted(strengths, reverse=True)
|
|
|
|
|
|
class TestClusterSrZonesMaxZones:
|
|
"""max_zones filtering."""
|
|
|
|
def test_max_zones_limits_output(self):
|
|
levels = [_level(50.0, 20), _level(150.0, 80), _level(250.0, 50)]
|
|
zones = cluster_sr_zones(
|
|
levels, current_price=100.0, tolerance=0.001, max_zones=2
|
|
)
|
|
assert len(zones) == 2
|
|
# Balanced selection: 1 support (strength 20) + 1 resistance (strength 80)
|
|
types = {z["type"] for z in zones}
|
|
assert "support" in types
|
|
assert "resistance" in types
|
|
assert zones[0]["strength"] == 80
|
|
assert zones[1]["strength"] == 20
|
|
|
|
def test_max_zones_none_returns_all(self):
|
|
levels = [_level(50.0, 20), _level(150.0, 80), _level(250.0, 50)]
|
|
zones = cluster_sr_zones(
|
|
levels, current_price=100.0, tolerance=0.001, max_zones=None
|
|
)
|
|
assert len(zones) == 3
|
|
|
|
def test_max_zones_larger_than_count_returns_all(self):
|
|
levels = [_level(50.0, 20)]
|
|
zones = cluster_sr_zones(
|
|
levels, current_price=100.0, max_zones=10
|
|
)
|
|
assert len(zones) == 1
|
|
|
|
|
|
class TestClusterSrZonesBalancedSelection:
|
|
"""Balanced interleave selection behaviour (Requirements 1.1, 1.2, 1.3, 1.5, 1.6)."""
|
|
|
|
def test_mixed_input_produces_balanced_output(self):
|
|
"""3 support + 3 resistance with max_zones=4 → 2 support + 2 resistance."""
|
|
levels = [
|
|
_level(80.0, 70), # support
|
|
_level(85.0, 50), # support
|
|
_level(90.0, 30), # support
|
|
_level(110.0, 60), # resistance
|
|
_level(115.0, 40), # resistance
|
|
_level(120.0, 20), # resistance
|
|
]
|
|
zones = cluster_sr_zones(levels, current_price=100.0, tolerance=0.001, max_zones=4)
|
|
assert len(zones) == 4
|
|
support_count = sum(1 for z in zones if z["type"] == "support")
|
|
resistance_count = sum(1 for z in zones if z["type"] == "resistance")
|
|
assert support_count == 2
|
|
assert resistance_count == 2
|
|
|
|
def test_all_support_fills_from_support_only(self):
|
|
"""When no resistance levels exist, all slots filled from support."""
|
|
levels = [
|
|
_level(80.0, 70),
|
|
_level(85.0, 50),
|
|
_level(90.0, 30),
|
|
]
|
|
zones = cluster_sr_zones(levels, current_price=200.0, tolerance=0.001, max_zones=2)
|
|
assert len(zones) == 2
|
|
assert all(z["type"] == "support" for z in zones)
|
|
|
|
def test_all_resistance_fills_from_resistance_only(self):
|
|
"""When no support levels exist, all slots filled from resistance."""
|
|
levels = [
|
|
_level(110.0, 60),
|
|
_level(115.0, 40),
|
|
_level(120.0, 20),
|
|
]
|
|
zones = cluster_sr_zones(levels, current_price=50.0, tolerance=0.001, max_zones=2)
|
|
assert len(zones) == 2
|
|
assert all(z["type"] == "resistance" for z in zones)
|
|
|
|
def test_single_zone_edge_case(self):
|
|
"""Only 1 level total → returns exactly 1 zone."""
|
|
levels = [_level(95.0, 45)]
|
|
zones = cluster_sr_zones(levels, current_price=100.0, max_zones=5)
|
|
assert len(zones) == 1
|
|
assert zones[0]["strength"] == 45
|
|
|
|
def test_both_types_present_when_max_zones_gte_2(self):
|
|
"""When both types exist and max_zones >= 2, at least one of each is present."""
|
|
levels = [
|
|
_level(70.0, 90), # support (strongest overall)
|
|
_level(75.0, 80), # support
|
|
_level(80.0, 70), # support
|
|
_level(130.0, 10), # resistance (weakest overall)
|
|
]
|
|
zones = cluster_sr_zones(levels, current_price=100.0, tolerance=0.001, max_zones=2)
|
|
types = {z["type"] for z in zones}
|
|
assert "support" in types
|
|
assert "resistance" in types
|
|
|