Description:
This strategy tests whether unusual nonlinear relationships between
short-term price returns and volume changes can forecast tradable
reversals or continuations. Instead of relying on simple momentum, it
ranks recent price and volume behavior, measures their dependency with
Kendall’s tau, and looks for moments when today’s volume response is
unusually high or low relative to what the recent price move would
imply.
Core dependency idea:
\[d_t=Rank(V_t)-E[Rank(V_t)\mid Rank(r_t),\tau]\]
A large positive or negative deviation suggests price and volume are behaving abnormally.
Signal filter:
\[|d_t|>Threshold\quad and\quad |\tau|>0.10\]
Risk management uses ATR-based stops:
\[Initial\ Stop=Entry\ Price\pm2\times ATR\]
Trailing stop:
\[Trailing\ Stop=Best\ Price\mp3\times ATR\]
Python code:
import backtrader as bt
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
from scipy.stats import kendalltau
plt.rcParams['figure.figsize'] = (10, 6)
class CopulaStrategy(bt.Strategy):
params = (
('lookback', 24),
('threshold', 0.9),
('trend_period', 24),
# ATR stop settings
('atr_period', 6),
('atr_mult_sl', 2.0), # initial stop: 2 * ATR
('atr_mult_trail', 3.0), # trailing stop: Highest/Lowest ± 3 * ATR
('min_stop_step', 0.01), # avoid tiny stop updates (as fraction of price)
)
def __init__(self):
# Store price/volume history
self.price_history = []
self.volume_history = []
self.returns_history = []
# Returns and volume changes
self.returns = bt.indicators.PctChange(self.data.close, period=1)
self.volume_change = bt.indicators.PctChange(self.data.volume, period=1)
# Trend filter
self.trend_ma = bt.indicators.SMA(period=self.params.trend_period)
# Copula signals
self.copula_signal = 0
self.dependency_strength = 0
# ATR for adaptive stops
self.atr = bt.indicators.ATR(self.data, period=self.p.atr_period)
# Order/position tracking
self.order = None
self.stop_order = None
self.current_stop = None # track active stop price
self.entry_price = None
self.highest_in_pos = None # for long trailing
self.lowest_in_pos = None # for short trailing
def estimate_copula_dependency(self, x, y):
if len(x) < self.params.lookback or len(y) < self.params.lookback:
return 0, 0
try:
x_ranks = stats.rankdata(x) / (len(x) + 1)
y_ranks = stats.rankdata(y) / (len(y) + 1)
tau, _ = kendalltau(x, y)
current_x_rank = stats.rankdata(np.append(x[:-1], x[-1]))[-1] / len(x)
current_y_rank = stats.rankdata(np.append(y[:-1], y[-1]))[-1] / len(y)
expected_y_rank = 0.5
if tau > 0:
expected_y_rank = current_x_rank
elif tau < 0:
expected_y_rank = 1 - current_x_rank
deviation = current_y_rank - expected_y_rank
return deviation, abs(tau)
except:
return 0, 0
def calculate_price_volume_dependency(self):
if len(self.returns_history) < self.params.lookback:
return 0, 0
recent_returns = np.array(self.returns_history[-self.params.lookback:])
recent_volume = np.array(self.volume_history[-self.params.lookback:])
valid = ~(np.isnan(recent_returns) | np.isnan(recent_volume))
if np.sum(valid) < 20:
return 0, 0
return self.estimate_copula_dependency(recent_returns[valid], recent_volume[valid])
def _place_initial_atr_stop(self, order):
"""Place initial ATR-based stop right after entry is completed."""
if not np.isfinite(self.atr[0]):
return
if self.position.size > 0: # long
stop_price = order.executed.price - self.p.atr_mult_sl * float(self.atr[0])
self.stop_order = self.sell(exectype=bt.Order.Stop, price=stop_price)
self.current_stop = stop_price
# init trailing anchor
self.highest_in_pos = float(self.data.high[0])
self.lowest_in_pos = None
self.entry_price = order.executed.price
elif self.position.size < 0: # short
stop_price = order.executed.price + self.p.atr_mult_sl * float(self.atr[0])
self.stop_order = self.buy(exectype=bt.Order.Stop, price=stop_price)
self.current_stop = stop_price
# init trailing anchor
self.lowest_in_pos = float(self.data.low[0])
self.highest_in_pos = None
self.entry_price = order.executed.price
def _maybe_update_trailing_stop(self):
"""Chandelier-style ATR trailing stop. Only ratchets in favorable direction."""
if self.stop_order is None or self.position.size == 0:
return
if not np.isfinite(self.atr[0]):
return
price = float(self.data.close[0])
atr_now = float(self.atr[0])
step = self.p.min_stop_step * price
if self.position.size > 0:
# update anchor
self.highest_in_pos = max(self.highest_in_pos or -np.inf, float(self.data.high[0]))
new_stop = self.highest_in_pos - self.p.atr_mult_trail * atr_now
# only raise stop
if new_stop > (self.current_stop or -np.inf) + step:
# replace stop
self.cancel(self.stop_order)
self.stop_order = self.sell(exectype=bt.Order.Stop, price=new_stop)
self.current_stop = new_stop
elif self.position.size < 0:
# update anchor
self.lowest_in_pos = min(self.lowest_in_pos or np.inf, float(self.data.low[0]))
new_stop = self.lowest_in_pos + self.p.atr_mult_trail * atr_now
# only lower stop
if new_stop < (self.current_stop or np.inf) - step:
self.cancel(self.stop_order)
self.stop_order = self.buy(exectype=bt.Order.Stop, price=new_stop)
self.current_stop = new_stop
def notify_order(self, order):
if order.status in [order.Completed]:
# On any entry/flip completion, place ATR stop
if (order.isbuy() and self.position.size > 0) or (order.issell() and self.position.size < 0):
# cancel any old stop (e.g., on flip)
if self.stop_order is not None:
self.cancel(self.stop_order)
self.stop_order = None
self.current_stop = None
self._place_initial_atr_stop(order)
if order.status in [order.Completed, order.Canceled, order.Rejected]:
# clear working reference for non-stop orders
if order != self.stop_order:
self.order = None
# if the stop itself fired, clear pointers
if order == self.stop_order and order.status == order.Completed:
self.stop_order = None
self.current_stop = None
self.entry_price = None
self.highest_in_pos = None
self.lowest_in_pos = None
def next(self):
if self.order is not None:
# still have a live market/close order
self._maybe_update_trailing_stop()
return
# update histories
if not np.isnan(self.returns[0]):
self.returns_history.append(self.returns[0])
if not np.isnan(self.volume_change[0]):
self.volume_history.append(self.volume_change[0])
# truncate
maxlen = self.params.lookback * 2
if len(self.returns_history) > maxlen:
self.returns_history = self.returns_history[-maxlen:]
if len(self.volume_history) > maxlen:
self.volume_history = self.volume_history[-maxlen:]
if len(self.returns_history) < self.params.lookback:
self._maybe_update_trailing_stop()
return
deviation, strength = self.calculate_price_volume_dependency()
self.copula_signal = deviation
self.dependency_strength = strength
if strength < 0.1:
self._maybe_update_trailing_stop()
return
uptrend = self.data.close[0] > self.trend_ma[0]
sig = abs(deviation) > self.params.threshold
if sig and uptrend:
if deviation > self.params.threshold:
if self.position.size > 0:
if self.stop_order is not None:
self.cancel(self.stop_order)
self.order = self.close()
elif not self.position:
self.order = self.sell()
elif deviation < -self.params.threshold:
if self.position.size < 0:
if self.stop_order is not None:
self.cancel(self.stop_order)
self.order = self.close()
elif not self.position:
self.order = self.buy()
elif sig and not uptrend:
if deviation > self.params.threshold:
if self.position.size < 0:
if self.stop_order is not None:
self.cancel(self.stop_order)
self.order = self.close()
elif not self.position:
self.order = self.buy()
elif deviation < -self.params.threshold:
if self.position.size > 0:
if self.stop_order is not None:
self.cancel(self.stop_order)
self.order = self.close()
elif not self.position:
self.order = self.sell()
# after any trade decision, attempt to ratchet the trailing stop
self._maybe_update_trailing_stop()