← Back to Home
A Copula-Based Price–Volume Dependency Strategy

A Copula-Based Price–Volume Dependency Strategy

Description:
This strategy tests whether unusual nonlinear relationships between short-term price returns and volume changes can forecast tradable reversals or continuations. Instead of relying on simple momentum, it ranks recent price and volume behavior, measures their dependency with Kendall’s tau, and looks for moments when today’s volume response is unusually high or low relative to what the recent price move would imply.

Core dependency idea:

\[d_t=Rank(V_t)-E[Rank(V_t)\mid Rank(r_t),\tau]\]

A large positive or negative deviation suggests price and volume are behaving abnormally.

Signal filter:

\[|d_t|>Threshold\quad and\quad |\tau|>0.10\]

Risk management uses ATR-based stops:

\[Initial\ Stop=Entry\ Price\pm2\times ATR\]

Trailing stop:

\[Trailing\ Stop=Best\ Price\mp3\times ATR\]

Python code:

import backtrader as bt
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
from scipy.stats import kendalltau
plt.rcParams['figure.figsize'] = (10, 6)

class CopulaStrategy(bt.Strategy):
    params = (
        ('lookback', 24),
        ('threshold', 0.9),
        ('trend_period', 24),

        # ATR stop settings
        ('atr_period', 6),
        ('atr_mult_sl', 2.0),      # initial stop: 2 * ATR
        ('atr_mult_trail', 3.0),   # trailing stop: Highest/Lowest ± 3 * ATR
        ('min_stop_step', 0.01), # avoid tiny stop updates (as fraction of price)
    )
    
    def __init__(self):
        # Store price/volume history
        self.price_history = []
        self.volume_history = []
        self.returns_history = []

        # Returns and volume changes
        self.returns = bt.indicators.PctChange(self.data.close, period=1)
        self.volume_change = bt.indicators.PctChange(self.data.volume, period=1)

        # Trend filter
        self.trend_ma = bt.indicators.SMA(period=self.params.trend_period)

        # Copula signals
        self.copula_signal = 0
        self.dependency_strength = 0

        # ATR for adaptive stops
        self.atr = bt.indicators.ATR(self.data, period=self.p.atr_period)

        # Order/position tracking
        self.order = None
        self.stop_order = None
        self.current_stop = None     # track active stop price
        self.entry_price = None
        self.highest_in_pos = None   # for long trailing
        self.lowest_in_pos = None    # for short trailing

    def estimate_copula_dependency(self, x, y):
        if len(x) < self.params.lookback or len(y) < self.params.lookback:
            return 0, 0
        try:
            x_ranks = stats.rankdata(x) / (len(x) + 1)
            y_ranks = stats.rankdata(y) / (len(y) + 1)

            tau, _ = kendalltau(x, y)

            current_x_rank = stats.rankdata(np.append(x[:-1], x[-1]))[-1] / len(x)
            current_y_rank = stats.rankdata(np.append(y[:-1], y[-1]))[-1] / len(y)

            expected_y_rank = 0.5
            if tau > 0:
                expected_y_rank = current_x_rank
            elif tau < 0:
                expected_y_rank = 1 - current_x_rank

            deviation = current_y_rank - expected_y_rank
            return deviation, abs(tau)
        except:
            return 0, 0

    def calculate_price_volume_dependency(self):
        if len(self.returns_history) < self.params.lookback:
            return 0, 0
        recent_returns = np.array(self.returns_history[-self.params.lookback:])
        recent_volume = np.array(self.volume_history[-self.params.lookback:])
        valid = ~(np.isnan(recent_returns) | np.isnan(recent_volume))
        if np.sum(valid) < 20:
            return 0, 0
        return self.estimate_copula_dependency(recent_returns[valid], recent_volume[valid])

    def _place_initial_atr_stop(self, order):
        """Place initial ATR-based stop right after entry is completed."""
        if not np.isfinite(self.atr[0]):
            return
        if self.position.size > 0:  # long
            stop_price = order.executed.price - self.p.atr_mult_sl * float(self.atr[0])
            self.stop_order = self.sell(exectype=bt.Order.Stop, price=stop_price)
            self.current_stop = stop_price
            # init trailing anchor
            self.highest_in_pos = float(self.data.high[0])
            self.lowest_in_pos = None
            self.entry_price = order.executed.price
        elif self.position.size < 0:  # short
            stop_price = order.executed.price + self.p.atr_mult_sl * float(self.atr[0])
            self.stop_order = self.buy(exectype=bt.Order.Stop, price=stop_price)
            self.current_stop = stop_price
            # init trailing anchor
            self.lowest_in_pos = float(self.data.low[0])
            self.highest_in_pos = None
            self.entry_price = order.executed.price

    def _maybe_update_trailing_stop(self):
        """Chandelier-style ATR trailing stop. Only ratchets in favorable direction."""
        if self.stop_order is None or self.position.size == 0:
            return
        if not np.isfinite(self.atr[0]):
            return

        price = float(self.data.close[0])
        atr_now = float(self.atr[0])
        step = self.p.min_stop_step * price

        if self.position.size > 0:
            # update anchor
            self.highest_in_pos = max(self.highest_in_pos or -np.inf, float(self.data.high[0]))
            new_stop = self.highest_in_pos - self.p.atr_mult_trail * atr_now
            # only raise stop
            if new_stop > (self.current_stop or -np.inf) + step:
                # replace stop
                self.cancel(self.stop_order)
                self.stop_order = self.sell(exectype=bt.Order.Stop, price=new_stop)
                self.current_stop = new_stop

        elif self.position.size < 0:
            # update anchor
            self.lowest_in_pos = min(self.lowest_in_pos or np.inf, float(self.data.low[0]))
            new_stop = self.lowest_in_pos + self.p.atr_mult_trail * atr_now
            # only lower stop
            if new_stop < (self.current_stop or np.inf) - step:
                self.cancel(self.stop_order)
                self.stop_order = self.buy(exectype=bt.Order.Stop, price=new_stop)
                self.current_stop = new_stop

    def notify_order(self, order):
        if order.status in [order.Completed]:
            # On any entry/flip completion, place ATR stop
            if (order.isbuy() and self.position.size > 0) or (order.issell() and self.position.size < 0):
                # cancel any old stop (e.g., on flip)
                if self.stop_order is not None:
                    self.cancel(self.stop_order)
                    self.stop_order = None
                    self.current_stop = None
                self._place_initial_atr_stop(order)

        if order.status in [order.Completed, order.Canceled, order.Rejected]:
            # clear working reference for non-stop orders
            if order != self.stop_order:
                self.order = None
            # if the stop itself fired, clear pointers
            if order == self.stop_order and order.status == order.Completed:
                self.stop_order = None
                self.current_stop = None
                self.entry_price = None
                self.highest_in_pos = None
                self.lowest_in_pos = None

    def next(self):
        if self.order is not None:
            # still have a live market/close order
            self._maybe_update_trailing_stop()
            return

        # update histories
        if not np.isnan(self.returns[0]):
            self.returns_history.append(self.returns[0])
        if not np.isnan(self.volume_change[0]):
            self.volume_history.append(self.volume_change[0])

        # truncate
        maxlen = self.params.lookback * 2
        if len(self.returns_history) > maxlen:
            self.returns_history = self.returns_history[-maxlen:]
        if len(self.volume_history) > maxlen:
            self.volume_history = self.volume_history[-maxlen:]

        if len(self.returns_history) < self.params.lookback:
            self._maybe_update_trailing_stop()
            return

        deviation, strength = self.calculate_price_volume_dependency()
        self.copula_signal = deviation
        self.dependency_strength = strength

        if strength < 0.1:
            self._maybe_update_trailing_stop()
            return

        uptrend = self.data.close[0] > self.trend_ma[0]
        sig = abs(deviation) > self.params.threshold

        if sig and uptrend:
            if deviation > self.params.threshold:
                if self.position.size > 0:
                    if self.stop_order is not None:
                        self.cancel(self.stop_order)
                    self.order = self.close()
                elif not self.position:
                    self.order = self.sell()
            elif deviation < -self.params.threshold:
                if self.position.size < 0:
                    if self.stop_order is not None:
                        self.cancel(self.stop_order)
                    self.order = self.close()
                elif not self.position:
                    self.order = self.buy()

        elif sig and not uptrend:
            if deviation > self.params.threshold:
                if self.position.size < 0:
                    if self.stop_order is not None:
                        self.cancel(self.stop_order)
                    self.order = self.close()
                elif not self.position:
                    self.order = self.buy()
            elif deviation < -self.params.threshold:
                if self.position.size > 0:
                    if self.stop_order is not None:
                        self.cancel(self.stop_order)
                    self.order = self.close()
                elif not self.position:
                    self.order = self.sell()

        # after any trade decision, attempt to ratchet the trailing stop
        self._maybe_update_trailing_stop()