6  Joins Avanzados: Non-Equi y Rolling Joins

En este capítulo dominarás
  • Non-equi joins: Uniones basadas en rangos y desigualdades
  • Rolling joins: La herramienta perfecta para series temporales
  • Update joins avanzados con condiciones complejas
  • Joins múltiples con condiciones heterogéneas
  • Casos de uso reales en finanzas, medicina y análisis temporal

6.1 Non-Equi Joins: Más Allá de la Igualdad

Los non-equi joins permiten unir tablas basándose en rangos, desigualdades y condiciones complejas. Son especialmente útiles en análisis médico, financiero y clasificación por rangos.

6.1.1 1. Conceptos Fundamentales

Un non-equi join utiliza operadores de comparación (<=, >=, <, >) en lugar de igualdad (==):

# Ejemplo simple: clasificar pacientes por IMC
rangos_imc <- rangos_medicos[parametro == "IMC"]
print("Rangos de IMC:")
#> [1] "Rangos de IMC:"
print(rangos_imc)
#>    parametro valor_min valor_max     categoria nivel_riesgo
#>       <char>     <num>     <num>        <char>       <char>
#> 1:       IMC       0.0      18.4   Bajo/Normal         Bajo
#> 2:       IMC      18.5      24.9 Normal/Óptimo       Normal
#> 3:       IMC      25.0       Inf   Alto/Riesgo         Alto

# Non-equi join básico
pacientes_clasificados_imc <- rangos_imc[pacientes_clinica,
  on = .(valor_min <= imc, valor_max >= imc),
  .(paciente_id, nombre, imc, categoria, nivel_riesgo)]

print("Pacientes clasificados por IMC:")
#> [1] "Pacientes clasificados por IMC:"
print(head(pacientes_clasificados_imc[order(imc)], 10))
#>     paciente_id      nombre   imc   categoria nivel_riesgo
#>           <int>      <char> <num>      <char>       <char>
#>  1:          69 Paciente_69   8.6 Bajo/Normal         Bajo
#>  2:           5  Paciente_5  11.2 Bajo/Normal         Bajo
#>  3:          98 Paciente_98  11.7 Bajo/Normal         Bajo
#>  4:          57 Paciente_57  12.2 Bajo/Normal         Bajo
#>  5:          46 Paciente_46  13.1 Bajo/Normal         Bajo
#>  6:          73 Paciente_73  13.9 Bajo/Normal         Bajo
#>  7:          66 Paciente_66  14.7 Bajo/Normal         Bajo
#>  8:          90 Paciente_90  14.7 Bajo/Normal         Bajo
#>  9:           7  Paciente_7  15.5 Bajo/Normal         Bajo
#> 10:          65 Paciente_65  15.7 Bajo/Normal         Bajo

6.1.2 2. Non-Equi Join con Múltiples Condiciones

# Clasificar pacientes por múltiples parámetros simultáneamente
# Crear función auxiliar para clasificar
clasificar_parametro <- function(dt, param_name, value_col) {
  rangos <- rangos_medicos[parametro == param_name]
  resultado <- rangos[dt, on = c("valor_min" = paste0(value_col, ">="), "valor_max" = paste0(value_col, "<=")),
                     .(paciente_id, parametro, categoria, nivel_riesgo),
                     nomatch = NULL]
  return(resultado)
}

# Clasificar por glucosa
pacientes_glucosa <- rangos_medicos[parametro == "Glucosa"][pacientes_clinica,
  on = .(valor_min <= glucosa, valor_max >= glucosa),
  .(paciente_id, parametro = "Glucosa", valor = glucosa, categoria, nivel_riesgo)]

# Clasificar por presión sistólica
pacientes_presion <- rangos_medicos[parametro == "Presión"][pacientes_clinica,
  on = .(valor_min <= presion_sistolica, valor_max >= presion_sistolica),  
  .(paciente_id, parametro = "Presion", valor = presion_sistolica, categoria, nivel_riesgo)]

# Combinar clasificaciones
todas_clasificaciones <- rbind(
  pacientes_glucosa[!is.na(categoria)],
  pacientes_presion[!is.na(categoria)]
)

# Resumen de riesgos por paciente
resumen_riesgo_pacientes <- todas_clasificaciones[,
  .(
    parametros_evaluados = .N,
    riesgos_altos = sum(nivel_riesgo == "Alto"),
    riesgos_normales = sum(nivel_riesgo == "Normal"),
    clasificacion_general = fcase(
      sum(nivel_riesgo == "Alto") >= 2, "Alto Riesgo Múltiple",
      sum(nivel_riesgo == "Alto") == 1, "Riesgo Moderado", 
      default = "Bajo Riesgo"
    )
  ), by = paciente_id]

print("Resumen de riesgos por paciente:")
#> [1] "Resumen de riesgos por paciente:"
print(head(resumen_riesgo_pacientes[order(-riesgos_altos)], 10))
#>     paciente_id parametros_evaluados riesgos_altos riesgos_normales
#>           <int>                <int>         <int>            <int>
#>  1:           1                    2             2                0
#>  2:           2                    2             2                0
#>  3:           4                    2             2                0
#>  4:           8                    2             2                0
#>  5:           9                    2             2                0
#>  6:          12                    2             2                0
#>  7:          16                    2             2                0
#>  8:          18                    2             2                0
#>  9:          19                    2             2                0
#> 10:          20                    2             2                0
#>     clasificacion_general
#>                    <char>
#>  1:  Alto Riesgo Múltiple
#>  2:  Alto Riesgo Múltiple
#>  3:  Alto Riesgo Múltiple
#>  4:  Alto Riesgo Múltiple
#>  5:  Alto Riesgo Múltiple
#>  6:  Alto Riesgo Múltiple
#>  7:  Alto Riesgo Múltiple
#>  8:  Alto Riesgo Múltiple
#>  9:  Alto Riesgo Múltiple
#> 10:  Alto Riesgo Múltiple

6.1.3 3. Non-Equi Join para Ventanas Temporales

# Encontrar todas las operaciones que ocurrieron dentro de ventanas de eventos
operaciones_en_ventanas <- eventos_mercado[operaciones_trading,
  on = .(ticker_afectado = ticker,
         fecha_inicio_ventana <= fecha_operacion,
         fecha_fin_ventana >= fecha_operacion),
  .(evento_id, tipo_evento, fecha_evento, impacto_esperado,
    operacion_id, fecha_operacion, tipo, cantidad, precio_limite),
  nomatch = NULL]

# Análisis de comportamiento en ventanas de eventos
comportamiento_eventos <- operaciones_en_ventanas[,
  .(
    operaciones_total = .N,
    operaciones_compra = sum(tipo == "COMPRA"),
    operaciones_venta = sum(tipo == "VENTA"),
    volumen_total = sum(cantidad),
    precio_promedio = round(mean(precio_limite), 2),
    dias_promedio_evento = round(mean(as.numeric(abs(fecha_operacion - fecha_evento))), 1)
  ), by = .(tipo_evento, impacto_esperado)]

print("Comportamiento de trading en ventanas de eventos:")
#> [1] "Comportamiento de trading en ventanas de eventos:"
print(comportamiento_eventos[order(-volumen_total)])
#>       tipo_evento impacto_esperado operaciones_total operaciones_compra
#>            <char>           <char>             <int>              <int>
#> 1:       Earnings         Positivo                 7                  4
#> 2:       Earnings           Neutro                 9                  7
#> 3:         Merger         Positivo                 8                  4
#> 4:   FDA_Approval         Positivo                 7                  3
#> 5:   FDA_Approval           Neutro                 5                  5
#> 6:          Split         Positivo                 6                  3
#> 7:         Merger           Neutro                 1                  1
#> 8: Product_Launch         Positivo                 2                  0
#> 9:       Earnings         Negativo                 1                  1
#>    operaciones_venta volumen_total precio_promedio dias_promedio_evento
#>                <int>         <num>           <num>                <num>
#> 1:                 3          5550         1114.30                  3.1
#> 2:                 2          3150         1407.04                  4.3
#> 3:                 4          3100         1581.95                  5.1
#> 4:                 4          2400         1328.23                  3.1
#> 5:                 0          1950         1803.68                  4.8
#> 6:                 3          1200         1841.82                  4.8
#> 7:                 0          1000         2428.78                  2.0
#> 8:                 2           600         1234.13                  2.0
#> 9:                 0           500         1804.09                  5.0

6.2 Rolling Joins: La Joya para Series Temporales

Los rolling joins son perfectos para conectar cada observación con el último valor disponible en el tiempo.

6.2.1 1. Rolling Join Básico

# Preparar datos con keys para rolling join
precios_key <- copy(precios_historicos)
operaciones_key <- copy(operaciones_trading)

# Establecer keys compuestas: ticker + fecha
setkey(precios_key, ticker, fecha)
setkey(operaciones_key, ticker, fecha_operacion)

# Rolling join: obtener el último precio disponible para cada operación
operaciones_con_precio <- precios_key[operaciones_key, roll = TRUE]

# Verificar estructura del resultado
print("Columnas disponibles después del rolling join:")
#> [1] "Columnas disponibles después del rolling join:"
print(names(operaciones_con_precio))
#> [1] "fecha"         "ticker"        "precio_cierre" "volumen"      
#> [5] "operacion_id"  "tipo"          "cantidad"      "precio_limite"
#> [9] "trader_id"
print("Primeras filas para entender la estructura:")
#> [1] "Primeras filas para entender la estructura:"
print(head(operaciones_con_precio, 3))
#> Key: <ticker, fecha>
#>         fecha ticker precio_cierre  volumen operacion_id   tipo cantidad
#>        <Date> <char>         <num>    <num>        <int> <char>    <num>
#> 1: 2024-01-01   AAPL        155.41 15619654          471 COMPRA       50
#> 2: 2024-01-04   AAPL        157.65 42789129          437 COMPRA      100
#> 3: 2024-01-06   AAPL        170.90  4219831           37  VENTA       50
#>    precio_limite trader_id
#>            <num>    <char>
#> 1:       1827.71        T5
#> 2:       2802.78       T18
#> 3:        971.83        T9

# En un rolling join X[Y], el resultado incluye todas las columnas de Y más las de X
# Las columnas de la fecha de Y se mantienen, no se prefijan con i.
print("Operaciones con precios históricos (rolling join):")
#> [1] "Operaciones con precios históricos (rolling join):"
if("fecha_operacion" %in% names(operaciones_con_precio)) {
  # Si fecha_operacion existe directamente
  print(head(operaciones_con_precio[, .(ticker, fecha, precio_cierre, 
                                       operacion_id, fecha_operacion, tipo, cantidad)], 10))
  # Calcular diferencia temporal
  operaciones_con_precio[, dias_diferencia := as.numeric(fecha_operacion - fecha)]
} else {
  # Buscar columnas con fecha en el nombre
  fecha_cols <- grep("fecha", names(operaciones_con_precio), value = TRUE)
  print(paste("Columnas con 'fecha' encontradas:", paste(fecha_cols, collapse = ", ")))
  
  # Mostrar primeras columnas disponibles
  print(head(operaciones_con_precio[, 1:min(8, ncol(operaciones_con_precio))], 10))
}
#> [1] "Columnas con 'fecha' encontradas: fecha"
#> Key: <ticker, fecha>
#>          fecha ticker precio_cierre  volumen operacion_id   tipo cantidad
#>         <Date> <char>         <num>    <num>        <int> <char>    <num>
#>  1: 2024-01-01   AAPL        155.41 15619654          471 COMPRA       50
#>  2: 2024-01-04   AAPL        157.65 42789129          437 COMPRA      100
#>  3: 2024-01-06   AAPL        170.90  4219831           37  VENTA       50
#>  4: 2024-01-17   AAPL        144.56 22593327          260 COMPRA      100
#>  5: 2024-01-21   AAPL        136.44 22967352           35 COMPRA      200
#>  6: 2024-01-25   AAPL        146.15 16764243          362  VENTA     1000
#>  7: 2024-01-26   AAPL        138.94 25121176          302 COMPRA      100
#>  8: 2024-01-28   AAPL        142.77  4554781          102 COMPRA      100
#>  9: 2024-02-01   AAPL        125.12 39017505          135 COMPRA      100
#> 10: 2024-02-12   AAPL        130.18  3604906          237 COMPRA      200
#>     precio_limite
#>             <num>
#>  1:       1827.71
#>  2:       2802.78
#>  3:        971.83
#>  4:        740.76
#>  5:        463.55
#>  6:       1978.25
#>  7:        758.49
#>  8:       2694.70
#>  9:       1218.84
#> 10:       1558.66

# Estadísticas de la calidad del match (solo si dias_diferencia fue creada)
if("dias_diferencia" %in% names(operaciones_con_precio)) {
  cat("Estadísticas del rolling join:\n")
  cat("• Operaciones con precio exacto (mismo día):", sum(operaciones_con_precio$dias_diferencia == 0, na.rm = TRUE), "\n")
  cat("• Operaciones con precio de días anteriores:", sum(operaciones_con_precio$dias_diferencia > 0, na.rm = TRUE), "\n")
  cat("• Diferencia promedio en días:", round(mean(operaciones_con_precio$dias_diferencia, na.rm = TRUE), 1), "\n")
} else {
  cat("La variable dias_diferencia no pudo ser creada debido a problemas con las columnas de fecha.\n")
}
#> La variable dias_diferencia no pudo ser creada debido a problemas con las columnas de fecha.

6.2.2 2. Rolling Join con Límites Temporales

# Rolling join con límite: solo usar precios de máximo 7 días anteriores
operaciones_precio_limitado <- precios_key[operaciones_key, 
                                          roll = 7,  # máximo 7 días
                                          rollends = c(TRUE, TRUE)]

# Comparar con rolling join ilimitado
matches_limitado <- sum(!is.na(operaciones_precio_limitado$precio_cierre))
matches_ilimitado <- sum(!is.na(operaciones_con_precio$precio_cierre))

cat("Comparación de rolling joins:\n")
#> Comparación de rolling joins:
cat("• Matches con rolling limitado (7 días):", matches_limitado, "\n") 
#> • Matches con rolling limitado (7 días): 111
cat("• Matches con rolling ilimitado:", matches_ilimitado, "\n")
#> • Matches con rolling ilimitado: 291
cat("• Diferencia:", matches_ilimitado - matches_limitado, "operaciones\n")
#> • Diferencia: 180 operaciones

# Analizar operaciones sin match en rolling limitado
operaciones_sin_precio <- operaciones_precio_limitado[is.na(precio_cierre)]
if(nrow(operaciones_sin_precio) > 0) {
  cat("• Operaciones sin precio (primeros días del año o fines de semana largos):", nrow(operaciones_sin_precio), "\n")
}
#> • Operaciones sin precio (primeros días del año o fines de semana largos): 389

6.2.3 3. Rolling Join Bidireccional (Nearest)

# Rolling join "nearest": buscar el precio más cercano (antes o después)
operaciones_nearest <- precios_key[operaciones_key, roll = "nearest"]

# Comparar diferentes tipos de rolling join
comparacion_rolling <- data.table(
  Tipo_Rolling = c("Backward (TRUE)", "Limited (7 days)", "Nearest"),
  Matches = c(
    sum(!is.na(operaciones_con_precio$precio_cierre)),
    sum(!is.na(operaciones_precio_limitado$precio_cierre)), 
    sum(!is.na(operaciones_nearest$precio_cierre))
  ),
  Cobertura_Pct = round(c(
    mean(!is.na(operaciones_con_precio$precio_cierre)) * 100,
    mean(!is.na(operaciones_precio_limitado$precio_cierre)) * 100,
    mean(!is.na(operaciones_nearest$precio_cierre)) * 100
  ), 1)
)

print("Comparación de tipos de rolling join:")
#> [1] "Comparación de tipos de rolling join:"
print(comparacion_rolling)
#>        Tipo_Rolling Matches Cobertura_Pct
#>              <char>   <int>         <num>
#> 1:  Backward (TRUE)     291          58.2
#> 2: Limited (7 days)     111          22.2
#> 3:          Nearest     500         100.0

6.2.4 4. Rolling Join con Sensores IoT

# Caso práctico: asociar eventos de mantenimiento con lecturas de sensores
setkey(sensores_iot, sensor_id, timestamp)
setkey(mantenimiento, sensor_id, fecha_mantenimiento)

# Rolling join para obtener la última lectura antes del mantenimiento
lecturas_pre_mantenimiento <- sensores_iot[mantenimiento, roll = TRUE]

# Verificar estructura del resultado
print("Columnas disponibles después del rolling join de sensores:")
#> [1] "Columnas disponibles después del rolling join de sensores:"
print(names(lecturas_pre_mantenimiento))
#> [1] "timestamp"          "sensor_id"          "valor"             
#> [4] "ubicacion"          "mantenimiento_id"   "tipo_mantenimiento"
#> [7] "duracion_horas"

# Análisis del estado de sensores antes del mantenimiento
if("fecha_mantenimiento" %in% names(lecturas_pre_mantenimiento)) {
  # Si fecha_mantenimiento existe directamente
  analisis_pre_mantenimiento <- lecturas_pre_mantenimiento[!is.na(valor),
    .(
      valor_promedio_pre = round(mean(valor), 2),
      valor_min_pre = min(valor),
      valor_max_pre = max(valor),
      eventos_mantenimiento = .N,
      tiempo_promedio_desde_lectura = round(mean(as.numeric(fecha_mantenimiento - timestamp) / 60), 1) # minutos
    ), by = .(sensor_id, tipo_mantenimiento)]
} else {
  # Buscar columnas con fecha o mantenimiento en el nombre
  fecha_cols <- grep("fecha|mantenimiento", names(lecturas_pre_mantenimiento), value = TRUE)
  print(paste("Columnas con 'fecha' o 'mantenimiento' encontradas:", paste(fecha_cols, collapse = ", ")))
  
  # Análisis simplificado sin cálculo temporal
  analisis_pre_mantenimiento <- lecturas_pre_mantenimiento[!is.na(valor),
    .(
      valor_promedio_pre = round(mean(valor), 2),
      valor_min_pre = min(valor),
      valor_max_pre = max(valor),
      eventos_mantenimiento = .N
    ), by = .(sensor_id, tipo_mantenimiento)]
}
#> [1] "Columnas con 'fecha' o 'mantenimiento' encontradas: mantenimiento_id, tipo_mantenimiento"

print("Análisis de sensores antes del mantenimiento:")
#> [1] "Análisis de sensores antes del mantenimiento:"
print(analisis_pre_mantenimiento[order(sensor_id, tipo_mantenimiento)])
#>       sensor_id tipo_mantenimiento valor_promedio_pre valor_min_pre
#>          <char>             <char>              <num>         <num>
#>  1:    HUMID_01        Calibracion              82.65         82.65
#>  2:    HUMID_01          Reemplazo             290.32         24.02
#>  3: PRESSURE_01           Limpieza             508.30         20.37
#>  4: PRESSURE_01          Reemplazo              19.74         19.74
#>  5:     TEMP_01        Calibracion             992.82        992.82
#>  6:     TEMP_01           Limpieza              22.69         22.69
#>  7:     TEMP_01          Reemplazo             529.52         55.85
#>  8:     TEMP_02        Calibracion              73.00         73.00
#>  9:     TEMP_02           Limpieza              21.75         20.97
#> 10:     TEMP_02          Reemplazo              29.41         20.87
#>     valor_max_pre eventos_mantenimiento
#>             <num>                 <int>
#>  1:         82.65                     1
#>  2:       1035.33                     4
#>  3:        996.24                     2
#>  4:         19.74                     1
#>  5:        992.82                     1
#>  6:         22.69                     1
#>  7:       1003.20                     2
#>  8:         73.00                     1
#>  9:         22.45                     3
#> 10:         51.82                     4

6.3 Update Joins Avanzados con Condiciones

6.3.1 1. Update Join Condicional

# Update join para marcar operaciones riesgosas
# Crear tabla de límites de riesgo por ticker
limites_riesgo <- data.table(
  ticker = c("AAPL", "GOOGL", "MSFT", "TSLA", "AMZN"),
  precio_max_seguro = c(200, 3000, 400, 250, 150),
  volumen_max_seguro = c(1000, 500, 800, 2000, 1200)
)

# Hacer copia para update join
operaciones_riesgo <- copy(operaciones_trading)

# Update join condicional
operaciones_riesgo[limites_riesgo, on = .(ticker),
                  `:=`(
                    precio_limite_riesgoso = i.precio_max_seguro < precio_limite,
                    volumen_riesgoso = i.volumen_max_seguro < cantidad,
                    limite_precio_ref = i.precio_max_seguro,
                    limite_volumen_ref = i.volumen_max_seguro
                  )]

# Clasificar nivel de riesgo general
operaciones_riesgo[, nivel_riesgo := fcase(
  precio_limite_riesgoso & volumen_riesgoso, "Alto Riesgo",
  precio_limite_riesgoso | volumen_riesgoso, "Riesgo Moderado",
  default = "Bajo Riesgo"
)]

# Resumen de riesgos
resumen_riesgos <- operaciones_riesgo[, .N, by = .(ticker, nivel_riesgo)][order(ticker, nivel_riesgo)]
print("Distribución de riesgo por ticker:")
#> [1] "Distribución de riesgo por ticker:"
print(resumen_riesgos)
#>     ticker    nivel_riesgo     N
#>     <char>          <char> <int>
#>  1:   AAPL     Bajo Riesgo     3
#>  2:   AAPL Riesgo Moderado    91
#>  3:   AMZN     Bajo Riesgo     1
#>  4:   AMZN Riesgo Moderado   105
#>  5:  GOOGL     Bajo Riesgo    83
#>  6:  GOOGL Riesgo Moderado    21
#>  7:   MSFT     Alto Riesgo    16
#>  8:   MSFT     Bajo Riesgo     9
#>  9:   MSFT Riesgo Moderado    74
#> 10:   TSLA     Bajo Riesgo     3
#> 11:   TSLA Riesgo Moderado    94

6.3.2 2. Update Join con Agregaciones Complejas

# Calcular estadísticas móviles y actualizar tabla principal
estadisticas_ticker <- operaciones_trading[,
  .(
    operaciones_historicas = .N,
    precio_promedio_historico = round(mean(precio_limite), 2),
    volumen_promedio_historico = round(mean(cantidad), 0),
    precio_max_historico = max(precio_limite),
    precio_min_historico = min(precio_limite),
    ratio_compra_venta = round(mean(tipo == "COMPRA"), 2)
  ), by = ticker]

# Update join para agregar contexto histórico
operaciones_riesgo[estadisticas_ticker, on = .(ticker),
                  `:=`(
                    percentil_precio = round((precio_limite - i.precio_min_historico) / 
                                           (i.precio_max_historico - i.precio_min_historico) * 100, 1),
                    precio_vs_promedio = round(precio_limite / i.precio_promedio_historico, 2),
                    volumen_vs_promedio = round(cantidad / i.volumen_promedio_historico, 2),
                    operaciones_ticker_total = i.operaciones_historicas
                  )]

print("Operaciones con contexto histórico:")
#> [1] "Operaciones con contexto histórico:"
print(head(operaciones_riesgo[, .(ticker, precio_limite, percentil_precio, 
                                 precio_vs_promedio, volumen_vs_promedio, nivel_riesgo)], 10))
#>     ticker precio_limite percentil_precio precio_vs_promedio
#>     <char>         <num>            <num>              <num>
#>  1:   AAPL       2596.34             87.1               1.60
#>  2:   MSFT       2878.99             97.6               1.82
#>  3:   MSFT        264.94              5.7               0.17
#>  4:   AAPL       2745.92             92.4               1.69
#>  5:   AMZN       2631.01             87.5               1.66
#>  6:   TSLA       1407.00             45.3               0.92
#>  7:   AMZN       2465.81             81.7               1.56
#>  8:  GOOGL       1629.14             53.4               1.09
#>  9:  GOOGL        910.92             28.1               0.61
#> 10:   TSLA       2405.32             80.6               1.58
#>     volumen_vs_promedio    nivel_riesgo
#>                   <num>          <char>
#>  1:                2.34 Riesgo Moderado
#>  2:                0.14 Riesgo Moderado
#>  3:                0.14     Bajo Riesgo
#>  4:                2.34 Riesgo Moderado
#>  5:                0.12 Riesgo Moderado
#>  6:                0.29 Riesgo Moderado
#>  7:                0.25 Riesgo Moderado
#>  8:                2.70 Riesgo Moderado
#>  9:                0.27     Bajo Riesgo
#> 10:                0.15 Riesgo Moderado

6.4 Casos de Uso Complejos: Combinando Técnicas

6.4.1 1. Pipeline Completo: Finanzas

# Pipeline complejo que combina non-equi y rolling joins
# Paso a paso para facilitar el debugging
step1 <- operaciones_trading[
  # 1. Rolling join para obtener precios históricos
  precios_historicos, on = .(ticker, fecha_operacion = fecha), roll = TRUE
][
  # 2. Filtrar solo operaciones con precio disponible
  !is.na(precio_cierre)
][
  # 3. Calcular métricas de trading
  , `:=`(
    diferencia_precio = round(precio_limite - precio_cierre, 2),
    ratio_precio = round(precio_limite / precio_cierre, 3),
    valor_operacion = precio_limite * cantidad
  )
]

# Verificar columnas antes del non-equi join
print("Columnas disponibles antes del non-equi join:")
print(names(step1))

step2 <- step1[
  # 4. Non-equi join con eventos de mercado para ventanas temporales
  eventos_mercado, 
  on = .(ticker = ticker_afectado,
         fecha_operacion >= fecha_inicio_ventana,
         fecha_operacion <= fecha_fin_ventana),
  allow.cartesian = TRUE
]

# Verificar columnas después del non-equi join
print("Columnas disponibles después del non-equi join:")
print(names(step2))

# Determinar qué columna usar para ticker en la agrupación
ticker_col <- if("ticker_afectado" %in% names(step2)) "ticker_afectado" else "ticker"

# 5. Agregar análisis por evento
if("ticker_afectado" %in% names(step2)) {
  pipeline_financiero <- step2[
    , .(
      operaciones_en_ventana = .N,
      valor_total = sum(valor_operacion),
      precio_promedio_limite = round(mean(precio_limite), 2),
      precio_promedio_mercado = round(mean(precio_cierre), 2),
      spread_promedio = round(mean(abs(diferencia_precio)), 2),
      operaciones_compra = sum(tipo == "COMPRA"),
      operaciones_venta = sum(tipo == "VENTA")
    ), by = .(evento_id, tipo_evento, ticker_afectado, impacto_esperado)
  ][
    operaciones_en_ventana >= 3  # Solo eventos con suficiente actividad
  ][
    order(-valor_total)
  ]
} else {
  # Usar ticker en lugar de ticker_afectado
  pipeline_financiero <- step2[
    , .(
      operaciones_en_ventana = .N,
      valor_total = sum(valor_operacion),
      precio_promedio_limite = round(mean(precio_limite), 2),
      precio_promedio_mercado = round(mean(precio_cierre), 2),
      spread_promedio = round(mean(abs(diferencia_precio)), 2),
      operaciones_compra = sum(tipo == "COMPRA"),
      operaciones_venta = sum(tipo == "VENTA")
    ), by = .(evento_id, tipo_evento, ticker, impacto_esperado)
  ][
    operaciones_en_ventana >= 3  # Solo eventos con suficiente actividad
  ][
    order(-valor_total)
  ]
}

print("Análisis de trading en ventanas de eventos:")
print(head(pipeline_financiero, 10))

# # Crear tabla interactiva del análisis (comentado para PDF)
# DT::datatable(
#   pipeline_financiero,
#   caption = "Análisis de Trading en Ventanas de Eventos de Mercado",
#   options = list(pageLength = 8, scrollX = TRUE)
# ) %>%
#   DT::formatCurrency("valor_total", currency = "$") %>%
#   DT::formatRound(c("precio_promedio_limite", "precio_promedio_mercado", "spread_promedio"), digits = 2)

6.4.2 2. Pipeline Médico Avanzado

# Pipeline médico combinando múltiples non-equi joins
evaluacion_medica_completa <- pacientes_clinica[
  # Añadir columna de edad en décadas para agrupación
  , decada := paste0(floor(edad/10)*10, "s")
][
  # 1. Clasificar por IMC
  rangos_medicos[parametro == "IMC"], 
  on = .(imc >= valor_min, imc <= valor_max),
  .(paciente_id, nombre, edad, decada, peso, altura, imc, 
    categoria_imc = categoria, riesgo_imc = nivel_riesgo,
    glucosa, presion_sistolica, colesterol)
][
  # 2. Clasificar por glucosa  
  rangos_medicos[parametro == "Glucosa"],
  on = .(glucosa >= valor_min, glucosa <= valor_max),
  .(paciente_id, nombre, edad, decada, peso, altura, imc, 
    categoria_imc, riesgo_imc, glucosa,
    categoria_glucosa = i.categoria, riesgo_glucosa = i.nivel_riesgo,
    presion_sistolica, colesterol)
][
  # 3. Clasificar por presión
  rangos_medicos[parametro == "Presión"],
  on = .(presion_sistolica >= valor_min, presion_sistolica <= valor_max),
  .(paciente_id, nombre, edad, decada, peso, altura, imc,
    categoria_imc, riesgo_imc, glucosa, categoria_glucosa, riesgo_glucosa,
    presion_sistolica, categoria_presion = i.categoria, riesgo_presion = i.nivel_riesgo,
    colesterol)
][
  # 4. Calcular score de riesgo compuesto
  , `:=`(
    factores_riesgo_alto = (riesgo_imc == "Alto") + (riesgo_glucosa == "Alto") + (riesgo_presion == "Alto"),
    factores_riesgo_total = 3,
    score_riesgo = (
      (riesgo_imc == "Alto") * 3 + (riesgo_imc == "Normal") * 1 +
      (riesgo_glucosa == "Alto") * 3 + (riesgo_glucosa == "Normal") * 1 +
      (riesgo_presion == "Alto") * 3 + (riesgo_presion == "Normal") * 1
    )
  )
][
  # 5. Clasificación final de riesgo
  , clasificacion_final := fcase(
    factores_riesgo_alto >= 2, "Paciente Alto Riesgo - Seguimiento Inmediato",
    factores_riesgo_alto == 1, "Paciente Riesgo Moderado - Seguimiento Regular", 
    score_riesgo >= 6, "Paciente Bajo Riesgo - Seguimiento Rutinario",
    default = "Paciente Muy Bajo Riesgo - Seguimiento Anual"
  )
][
  order(-score_riesgo)
]

# Resumen por grupo de edad
resumen_por_decada <- evaluacion_medica_completa[,
  .(
    pacientes = .N,
    edad_promedio = round(mean(edad), 1),
    imc_promedio = round(mean(imc), 1),
    alto_riesgo = sum(factores_riesgo_alto >= 2),
    riesgo_moderado = sum(factores_riesgo_alto == 1),
    bajo_riesgo = sum(factores_riesgo_alto == 0),
    score_riesgo_promedio = round(mean(score_riesgo), 1)
  ), by = decada][order(decada)]

print("Resumen de evaluación médica por década:")
print(resumen_por_decada)

print("\nPacientes de mayor riesgo:")
print(head(evaluacion_medica_completa[factores_riesgo_alto >= 2, 
                                     .(nombre, edad, factores_riesgo_alto, score_riesgo, clasificacion_final)], 8))

6.5 Ejercicios Prácticos

🏋️ Ejercicio 10: Sistema de Alertas de Trading

Usando los datasets de precios y operaciones:

  1. Rolling join para obtener precios en tiempo real
  2. Non-equi join para identificar operaciones en rangos de volatilidad alta
  3. Update join para calcular PnL potencial
  4. Crear sistema de alertas basado en múltiples condiciones
# Sistema completo de alertas de trading
# 1. Calcular volatilidad histórica
volatilidad_historica <- precios_historicos[order(ticker, fecha)][,
  .(fecha, volatilidad_10d = frollapply(precio_cierre, 10, sd, na.rm = TRUE)),
  by = ticker][!is.na(volatilidad_10d)]

# 2. Definir rangos de volatilidad para non-equi join
rangos_volatilidad <- data.table(
  nivel = c("Baja", "Media", "Alta", "Extrema"),
  vol_min = c(0, 5, 15, 30),
  vol_max = c(5, 15, 30, Inf),
  factor_riesgo = c(1, 2, 3, 4)
)

# 3. Pipeline completo del sistema de alertas
sistema_alertas <- operaciones_trading[
  # Rolling join con precios históricos
  precios_historicos, on = .(ticker, fecha_operacion = fecha), roll = TRUE
][
  !is.na(precio_cierre)  # Solo operaciones con precio disponible
][
  # Rolling join con volatilidad
  volatilidad_historica, on = .(ticker, fecha_operacion = fecha), roll = TRUE
][
  !is.na(volatilidad_10d)  # Solo con volatilidad calculada
][
  # Non-equi join para clasificar por volatilidad
  rangos_volatilidad, on = .(volatilidad_10d >= vol_min, volatilidad_10d <= vol_max),
  .(operacion_id, ticker, fecha_operacion, tipo, cantidad, precio_limite, precio_cierre,
    volatilidad_10d, nivel_volatilidad = nivel, factor_riesgo)
][
  # 4. Calcular métricas de riesgo y PnL potencial
  , `:=`(
    diferencia_precio = precio_limite - precio_cierre,
    pnl_potencial_pct = round((precio_limite - precio_cierre) / precio_cierre * 100, 2),
    valor_operacion = precio_limite * cantidad,
    riesgo_volatilidad = volatilidad_10d * factor_riesgo,
    spread_pct = round(abs(precio_limite - precio_cierre) / precio_cierre * 100, 2)
  )
][
  # 5. Sistema de alertas basado en múltiples condiciones
  , alerta_tipo := fcase(
    # Alerta crítica: alta volatilidad + spread alto + operación grande
    nivel_volatilidad %in% c("Alta", "Extrema") & spread_pct > 5 & valor_operacion > 100000,
    "CRÍTICA - Alta Volatilidad + Spread Alto + Volumen Alto",
    
    # Alerta alta: precio muy diferente del mercado
    abs(pnl_potencial_pct) > 10 & valor_operacion > 50000,
    "ALTA - Precio Fuera de Rango + Volumen Significativo",
    
    # Alerta media: volatilidad alta
    nivel_volatilidad %in% c("Alta", "Extrema") & valor_operacion > 25000,
    "MEDIA - Alta Volatilidad",
    
    # Alerta baja: spread moderado
    spread_pct > 3 & valor_operacion > 10000,
    "BAJA - Spread Moderado",
    
    default = "SIN ALERTA"
  )
][
  # 6. Filtrar solo operaciones con alertas
  alerta_tipo != "SIN ALERTA"
][
  order(-riesgo_volatilidad, -valor_operacion)
][
  # 7. Agregar prioridad numérica para ordenamiento
  , prioridad := fcase(
    grepl("CRÍTICA", alerta_tipo), 1,
    grepl("ALTA", alerta_tipo), 2, 
    grepl("MEDIA", alerta_tipo), 3,
    grepl("BAJA", alerta_tipo), 4,
    default = 5
  )
][
  order(prioridad, -valor_operacion)
]

# Dashboard de alertas
cat("🚨 SISTEMA DE ALERTAS DE TRADING 🚨\n\n")
#> 🚨 SISTEMA DE ALERTAS DE TRADING 🚨

# Resumen por tipo de alerta
resumen_alertas <- sistema_alertas[, .(
  operaciones = .N,
  valor_total = sum(valor_operacion),
  volatilidad_promedio = round(mean(volatilidad_10d), 2),
  spread_promedio = round(mean(spread_pct), 2)
), by = .(alerta_tipo, prioridad)][order(prioridad)]

print("RESUMEN DE ALERTAS:")
#> [1] "RESUMEN DE ALERTAS:"
print(resumen_alertas)
#>                                                alerta_tipo prioridad
#>                                                     <char>     <num>
#> 1: CRÍTICA - Alta Volatilidad + Spread Alto + Volumen Alto         1
#> 2:    ALTA - Precio Fuera de Rango + Volumen Significativo         2
#> 3:                                MEDIA - Alta Volatilidad         3
#> 4:                                  BAJA - Spread Moderado         4
#>    operaciones valor_total volatilidad_promedio spread_promedio
#>          <int>       <num>                <num>           <num>
#> 1:          61    41874580                16.23          319.11
#> 2:         197   100551879                 5.58          500.71
#> 3:          10      353714                15.00          116.39
#> 4:          59     7979808                 6.78          172.47

cat("\n📊 TOP 10 OPERACIONES DE MAYOR RIESGO:\n")
#> 
#> 📊 TOP 10 OPERACIONES DE MAYOR RIESGO:
print(sistema_alertas[1:10, .(
  Ticker = ticker, 
  Tipo = tipo,
  Valor = paste0("$", format(valor_operacion, big.mark = ",")),
  Spread = paste0(spread_pct, "%"),
  Volatilidad = paste0(round(volatilidad_10d, 1)),
  Alerta = alerta_tipo
)])
#>     Ticker   Tipo      Valor  Spread Volatilidad
#>     <char> <char>     <char>  <char>      <char>
#>  1:   TSLA  VENTA $2,161,110 683.66%          15
#>  2:   TSLA  VENTA $2,161,110 680.32%          15
#>  3:   TSLA  VENTA $2,161,110 623.26%          15
#>  4:   TSLA  VENTA $2,161,110 518.59%          15
#>  5:   TSLA  VENTA $2,161,110 544.22%          15
#>  6:   TSLA  VENTA $2,161,110 514.93%          15
#>  7:  GOOGL COMPRA $1,463,000  47.18%          15
#>  8:  GOOGL COMPRA $1,463,000  46.66%          15
#>  9:  GOOGL COMPRA $1,463,000  47.21%          15
#> 10:   TSLA COMPRA $1,350,470  351.9%          15
#>                                                      Alerta
#>                                                      <char>
#>  1: CRÍTICA - Alta Volatilidad + Spread Alto + Volumen Alto
#>  2: CRÍTICA - Alta Volatilidad + Spread Alto + Volumen Alto
#>  3: CRÍTICA - Alta Volatilidad + Spread Alto + Volumen Alto
#>  4: CRÍTICA - Alta Volatilidad + Spread Alto + Volumen Alto
#>  5: CRÍTICA - Alta Volatilidad + Spread Alto + Volumen Alto
#>  6: CRÍTICA - Alta Volatilidad + Spread Alto + Volumen Alto
#>  7: CRÍTICA - Alta Volatilidad + Spread Alto + Volumen Alto
#>  8: CRÍTICA - Alta Volatilidad + Spread Alto + Volumen Alto
#>  9: CRÍTICA - Alta Volatilidad + Spread Alto + Volumen Alto
#> 10: CRÍTICA - Alta Volatilidad + Spread Alto + Volumen Alto

# # Crear tabla interactiva (comentado para PDF)
# DT::datatable(
#   sistema_alertas[1:20],
#   caption = "Sistema de Alertas de Trading - Top 20 Operaciones de Riesgo",
#   options = list(pageLength = 10, scrollX = TRUE)
# ) %>%
#   DT::formatCurrency("valor_operacion", currency = "$") %>%
#   DT::formatRound(c("volatilidad_10d", "spread_pct"), digits = 2) %>%
#   DT::formatStyle(
#     "alerta_tipo",
#     backgroundColor = DT::styleEqual(
#       c("CRÍTICA - Alta Volatilidad + Spread Alto + Volumen Alto",
#         "ALTA - Precio Fuera de Rango + Volumen Significativo", 
#         "MEDIA - Alta Volatilidad",
#         "BAJA - Spread Moderado"),
#       c("red", "orange", "yellow", "lightblue")
#     )
#   )

6.6 Mejores Prácticas para Joins Avanzados

6.6.1 1. Performance y Optimización

# ✅ HACER: Establecer keys antes de rolling joins repetitivos
setkey(tabla_temporal, id, timestamp)
setkey(tabla_eventos, id, fecha_evento)
resultado <- tabla_temporal[tabla_eventos, roll = TRUE]

# ✅ HACER: Filtrar antes de joins complejos
tabla_filtrada <- tabla_grande[fecha >= fecha_inicio & fecha <= fecha_fin]
resultado <- tabla_filtrada[otra_tabla, on = .(columna)]

# ✅ HACER: Usar nomatch = NULL para inner joins en non-equi
resultado <- tabla1[tabla2, on = .(col1 >= min_val, col1 <= max_val), nomatch = NULL]

# ❌ EVITAR: Non-equi joins sin filtros previos en tablas enormes
# Puede generar productos cartesianos masivos

6.6.2 2. Manejo de Casos Edge

# ✅ HACER: Validar resultados de rolling joins
resultado <- tabla1[tabla2, roll = TRUE]
cat("Matches encontrados:", sum(!is.na(resultado$columna_tabla1)), "\n")
cat("Matches perdidos:", sum(is.na(resultado$columna_tabla1)), "\n")

# ✅ HACER: Establecer límites razonables en rolling joins
resultado <- tabla1[tabla2, roll = 7]  # máximo 7 unidades de tiempo

# ✅ HACER: Verificar cartesian products en non-equi joins
if(nrow(resultado) > nrow(tabla2) * 2) {
  warning("Posible cartesian product no deseado")
}

🎯 Puntos Clave de Este Capítulo
  1. Non-equi joins permiten uniones basadas en rangos y desigualdades - perfectos para clasificaciones médicas y financieras
  2. Rolling joins son esenciales para series temporales - conectan cada punto con el último valor disponible
  3. Combinar técnicas (non-equi + rolling + update) permite análisis muy sofisticados
  4. Performance: Establecer keys apropiadas es crucial para joins avanzados
  5. Validación: Siempre verificar resultados para evitar cartesian products no deseados
  6. Casos de uso reales: Finanzas, medicina, IoT - cualquier dominio con rangos temporales o de valores

Los joins avanzados son herramientas poderosas que abren posibilidades analíticas únicas. En el próximo capítulo exploraremos las funciones especiales que complementan estos joins para análisis aún más sofisticados.