실시간 정책 제약 하에서 산불 대피 물류 네트워크를 위한 Self-Supervised Temporal Pattern Mining

발행: (2025년 12월 30일 오전 06:24 GMT+9)
11 min read
원문: Dev.to

Source: Dev.to

Introduction: 이 연구를 촉발한 학습 여정

2023년 산불 시즌에, 나는 북부 캘리포니아에서 대피 경로 실패를 분석하던 중 돌파구를 발견했다. 대피 병목 현상을 예측하기 위해 전통적인 지도학습 모델을 실험했지만, 대피 중에 정책 제약이 변경될 때마다 모델이 계속 실패했다. 모델은 과거 데이터를 기반으로 학습했지만, 갑작스러운 도로 폐쇄나 대피소 수용 능력 변화와 같은 실시간 정책 변동은 모델을 사실상 쓸모 없게 만들었다.

컴퓨터 비전 분야의 self‑supervised learning 논문을 탐색하던 중 흥미로운 사실을 발견했다: 라벨이 없는 이미지에서 표현을 학습하는 기술을 대피 물류의 시간 순서 데이터에 적용할 수 있다는 것이다. contrastive‑learning 접근법을 연구하면서, 대피 데이터의 서로 다른 시간 창을 동일한 기본 프로세스의 별도 “뷰”로 취급하면 명시적 라벨 없이도 강인한 시간 패턴을 학습하는 모델을 만들 수 있음을 알게 되었다. 이것이 바로 내가 self‑supervised temporal pattern mining for wildfire evacuation networks 에 대한 연구를 시작하게 된 계기였다.

산불 대피 물류는 비상 관리에서 가장 어려운 temporal‑optimization 문제 중 하나이다. 시스템은 여러 동적 요소를 포함한다:

  • Temporal patterns in fire spread (시간당/일별 사이클, 날씨 의존성)
  • Human‑behavior patterns (대피‑결정 시점, 경로 선호)
  • Infrastructure dynamics (도로‑용량 악화, 통신‑네트워크 장애)
  • Policy constraints (대피 명령, 자원‑배분 규칙, 관할 구역 경계)

기존 대피 모델을 조사하면서 대부분의 접근법이 이러한 요소들을 독립적으로 다루거나 상호작용에 대해 단순화된 가정을 사용한다는 것을 발견했다. 돌파구는 전체 대피 생태계를 temporal graph 로 바라보기 시작했을 때 찾아왔다. 여기서 노드는 의사결정 지점을, 엣지는 시간적 의존성을 나타낸다.

최근 self‑supervised learning의 발전을 공부하면서, 시간 데이터에 대한 핵심 통찰은 모델이 유용한 표현을 학습하도록 강제하는 의미 있는 pre‑text tasks 를 만드는 것임을 알았다. 대피 네트워크를 위해 나는 세 가지 핵심 pre‑text task 를 개발했다:

  1. Temporal contrastive prediction – 정상적인 시간 패턴과 이상 패턴을 구별하도록 학습.
  2. Masked temporal modeling – 시간 순서의 누락된 구간을 예측.
  3. Temporal alignment – 서로 다른 시간 스케일 간 패턴을 정렬하도록 학습.

이러한 작업을 실험하면서 temporal contrastive learning 이 정책 제약이 있는 시나리오에서 가장 강인한 표현을 만든다는 흥미로운 결과를 얻었다. 모델은 명시적 감독 없이도 시간 패턴이 정책 제약을 위반했는지를 인식하게 되었다.

시간 데이터에 대한 transformer 구조를 탐구하면서 나는 temporal convolutional networksattention mechanisms 를 결합한 하이브리드 모델을 개발했다. 핵심 혁신은 constraint‑aware masking 을 통해 정책 제약을 직접 attention 메커니즘에 통합한 것이었다.

정책‑제한 시간 주의 (PyTorch)

import torch
import torch.nn as nn
import torch.nn.functional as F

class PolicyConstrainedTemporalAttention(nn.Module):
    def __init__(self, d_model, n_heads, max_seq_len=96):
        super().__init__()
        self.d_model = d_model
        self.n_heads = n_heads
        self.head_dim = d_model // n_heads

        # Linear projections
        self.query = nn.Linear(d_model, d_model)
        self.key   = nn.Linear(d_model, d_model)
        self.value = nn.Linear(d_model, d_model)

        # Policy‑constraint embeddings (10 constraint types)
        self.policy_embedding = nn.Embedding(10, d_model)
        # Positional embeddings for temporal information
        self.temporal_position = nn.Embedding(max_seq_len, d_model)

    def forward(self, x, policy_mask, temporal_positions):
        """
        x                : [batch, seq_len, d_model]
        policy_mask      : [batch, seq_len] (indices of constraint types)
        temporal_positions: [batch, seq_len] (position indices)
        """
        batch_size, seq_len, _ = x.shape

        # Add temporal and policy information
        x = x + self.temporal_position(temporal_positions)
        x = x + self.policy_embedding(policy_mask)

        # Multi‑head projections
        Q = self.query(x).view(batch_size, seq_len, self.n_heads, self.head_dim)
        K = self.key(x).view(batch_size, seq_len, self.n_heads, self.head_dim)
        V = self.value(x).view(batch_size, seq_len, self.n_heads, self.head_dim)

        # Scaled dot‑product attention
        attn_scores = torch.einsum('bqhd,bkhd->bhqk', Q, K) / (self.head_dim ** 0.5)

        # Apply policy‑constraint mask (0 = forbidden)
        policy_mask_matrix = self._create_policy_mask(policy_mask)  # [batch, heads, seq_len, seq_len]
        attn_scores = attn_scores.masked_fill(policy_mask_matrix == 0, float('-inf'))

        attn_weights = F.softmax(attn_scores, dim=-1)
        out = torch.einsum('bhqk,bkhd->bqhd', attn_weights, V)
        out = out.reshape(batch_size, seq_len, self.d_model)

        return out

    def _create_policy_mask(self, policy_mask):
        """
        Dummy implementation – replace with actual logic that creates a
        [batch, heads, seq_len, seq_len] mask based on policy constraints.
        """
        batch, seq_len = policy_mask.shape
        mask = torch.ones(batch, self.n_heads, seq_len, seq_len, device=policy_mask.device)
        # Example: zero‑out attention where constraint type == 0
        mask[:, :, :, :] = (policy_mask.unsqueeze(1).unsqueeze(-1) != 0).float()
        return mask

Temporal Contrastive Loss (PyTorch)

class TemporalContrastiveLoss(nn.Module):
    def __init__(self, temperature=0.1, temporal_window=6):
        super().__init__()
        self.temperature = temperature
        self.temporal_window = temporal_window

    def forward(self, embeddings, temporal_labels):
        """
        embeddings      : [batch, seq_len, embed_dim]
        temporal_labels : [batch, seq_len] (segment identifiers)
        """
        batch_size, seq_len, embed_dim = embeddings.shape
        loss = 0.0

        # Iterate over sliding windows to form anchor‑positive pairs
        for i in range(seq_len - self.temporal_window):
            # Anchor: mean embedding of the current window
            anchor = embeddings[:, i:i + self.temporal_window].mean(dim=1)  # [batch, embed_dim]

            # Positive: mean embedding of the next window (temporal proximity)
            positive = embeddings[:, i + 1:i + 1 + self.temporal_window].mean(dim=1)

            # Compute cosine similarity
            sim_pos = F.cosine_similarity(anchor, positive) / self.temperature

            # Negatives: all other windows in the batch
            neg_mask = torch.arange(seq_len - self.temporal_window) != i
            negatives = embeddings[:, neg_mask][:, :, :].view(batch_size, -1, embed_dim)
            neg_sim = F.cosine_similarity(
                anchor.unsqueeze(1), negatives, dim=-1
            ) / self.temperature  # [batch, num_neg]

            # InfoNCE loss
            logits = torch.cat([sim_pos.unsqueeze(1), neg_sim], dim=1)  # [batch, 1+num_neg]
            labels = torch.zeros(batch_size, dtype=torch.long, device=logits.device)  # anchor is class 0
            loss += F.cross_entropy(logits, labels)

        return loss / (seq_len - self.temporal_window)

요약

시간적 대조 학습정책 인식 어텐션 메커니즘과 결합하면, 실시간 대피 정책이 변동하더라도 견고한 표현을 얻을 수 있습니다. 이 프레임워크는 홍수 대응, 팬데믹 물류, 대규모 전력망 복구와 같은 시간에 민감하고 제약이 많은 다른 도메인에도 확장하여 적용할 수 있습니다.

대비 손실 계산

# Positive: nearby temporal window
pos_start = i + self.temporal_window
pos_end = pos_start + self.temporal_window
positive = embeddings[:, pos_start:pos_end].mean(dim=1)

# Negatives: distant temporal windows
negative_indices = torch.randint(0, seq_len, (batch_size, 10))
negatives = embeddings[torch.arange(batch_size).unsqueeze(1),
                        negative_indices].mean(dim=1)

# Compute contrastive loss
pos_sim = F.cosine_similarity(anchor, positive, dim=-1)
neg_sim = F.cosine_similarity(anchor.unsqueeze(1), negatives, dim=-1)

logits = torch.cat([pos_sim.unsqueeze(1), neg_sim], dim=1) / self.temperature
labels = torch.zeros(batch_size, dtype=torch.long, device=embeddings.device)

loss += F.cross_entropy(logits, labels)

return loss / (seq_len - self.temporal_window)

차별화 가능한 정책 레이어 (PyTorch)

class DifferentiablePolicyLayer(nn.Module):
    def __init__(self, constraint_types, max_constraints=5):
        super().__init__()
        self.constraint_types = constraint_types
        self.constraint_encoder = nn.Linear(constraint_types, 128)
        self.temporal_projection = nn.Linear(128, 256)

    def forward(self, temporal_patterns, policy_constraints, current_time):
        """
        temporal_patterns:   [batch_size, seq_len, features]
        policy_constraints:  [batch_size, num_constraints, constraint_dim]
        current_time:        scalar representing current time step
        """
        batch_size, seq_len, _ = temporal_patterns.shape

        # Encode policy constraints
        constraint_emb = self.constraint_encoder(policy_constraints)
        constraint_emb = torch.mean(constraint_emb, dim=1)  # Aggregate constraints

        # Project to temporal dimension
        temporal_constraints = self.temporal_projection(constraint_emb)
        temporal_constraints = temporal_constraints.unsqueeze(1).expand(-1, seq_len, -1)

        # Apply constraints as attention modulation
        constrained_patterns = temporal_patterns * torch.sigmoid(temporal_constraints)

        # Time‑aware constraint enforcement
        time_weights = self._compute_time_weights(current_time, seq_len)
        constrained_patterns = constrained_patterns * time_weights.unsqueeze(-1)

        return constrained_patterns

    def _compute_time_weights(self, current_time, seq_len):
        """Compute weights based on temporal proximity to policy changes."""
        time_steps = torch.arange(seq_len, device=self.constraint_encoder.weight.device)
        time_diff = torch.abs(time_steps - current_time)
        weights = torch.exp(-time_diff / 10.0)  # Exponential decay
        return weights

Temporal Route Optimizer (PyTorch)

class TemporalRouteOptimizer:
    def __init__(self, pattern_miner, constraint_manager):
        self.pattern_miner = pattern_miner
        self.constraint_manager = constraint_manager

    def optimize_evacuation_routes(self, current_state, time_horizon, policy_updates):
        """
        current_state:   현재 대피 네트워크 상태
        time_horizon:   최적화할 미래 시간 단계 수
        policy_updates: 실시간 정책 변경 사항
        """
        # 현재 상태에서 시간적 패턴 추출
        temporal_features = self._extract_temporal_features(current_state)

        # 정책 제약 적용
        constrained_features = self.constraint_manager.apply_constraints(
            temporal_features, policy_updates
        )

        # 시간적 패턴 마이닝
        patterns = self.pattern_miner.mine_patterns(constrained_features)

        # 대피 계획 생성
        plans = []
        for t in range(time_horizon):
            # 학습된 패턴을 사용해 미래 상태 예측
            future_state = self._predict_state(patterns, t)

            # 해당 시간 단계에 대한 경로 최적화
            routes = self._optimize_routes(future_state, policy_updates)
            plans.append(routes)

            # 새로운 정보를 기반으로 패턴 업데이트
            patterns = self._update_patterns(patterns, routes)

        return plans

    def _extract_temporal_features(self, state):
        """네트워크 상태에서 시간적 특성 추출."""
        features = []
        # 도로 네트워크 시간적 특성
        features.append(state['road_congestion_trend'])
        features.append(state['evacuation_rate'])
        features.append(state['resource_availability'])

        # 환경 시간적 특성
        features.append(state['fire_spread_rate'])
        features.append(state['weather_conditions'])

        return torch.stack(features, dim=-1)

정책 적응 주의 (PyTorch)

class PolicyAdaptiveAttention(nn.Module):
    def __init__(self, base_model, adaptation_layers=3):
        super().__init__()
        self.base_model = base_mod  # Note: original variable name truncated; ensure correct reference
        # Additional adaptation layers would be defined here
        # ...

    # Placeholder for further implementation

정책‑조건 레이어가 포함된 적응 모델

self.adaptation_layers = nn.ModuleList([
    nn.Linear(base_model.hidden_size, base_model.hidden_size)
    for _ in range(adaptation_layers)
])

def forward(self, x, new_policy_constraints):
    # Get base representations
    base_repr = self.base_model(x)

    # Rapid adaptation to new policies
    adapted_repr = base_repr
    for layer in self.adaptation_layers:
        # Concatenate policy information
        policy_expanded = new_policy_constraints.unsqueeze(1).expand(
            -1, adapted_repr.size(1), -1
        )
        combined = torch.cat([adapted_repr, policy_expanded], dim=-1)

        # Apply adaptation (residual connection)
        adapted_repr = layer(combined) + adapted_repr

    return adapted_repr

합성 대피 데이터 생성 (PyTorch)

class SyntheticEvacuationGenerator:
    def __init__(self, pattern_miner, physics_simulator):
        self.pattern_miner = pattern_miner
        self.physics_simulator = physics_simulator

    def generate_scenarios(self, base_patterns, num_scenarios, variability=0.3):
        """Generate synthetic evacuation scenarios."""
        scenarios = []

        for _ in range(num_scenarios):
            # Sample from learned patterns
            pattern_idx = torch.randint(0, len(base_patterns), (1,))
            base_pattern = base_patterns[pattern_idx]

            # Apply realistic variations
            varied_pattern = self._apply_variations(base_pattern, variability)

            # Simulate physics‑based constraints
            physics_constraints = self.physics_simulator.simulate(varied_pattern)

            # Combine patterns with physics
            full_scenario = self._combine_patterns(varied_pattern, physics_constraints)
            scenarios.append(full_scenario)

        return torch.stack(scenarios)

엣지 배포를 위한 지식 증류 (PyTorch)

class TemporalKnowledgeDistillation:
    def __init__(self, teacher_model, student_model, temperature=2.0):
        self.teacher = teacher_model
        self.student = student_model
        self.temperature = temperature
        self.criterion = nn.KLDivLoss(reduction='batchmean')

    def distill(self, data_loader, optimizer, epochs=5):
        self.teacher.eval()
        self.student.train()

        for epoch in range(epochs):
            for batch in data_loader:
                inputs = batch['inputs']
                with torch.no_grad():
                    teacher_logits = self.teacher(inputs) / self.temperature

                student_logits = self.student(inputs) / self.temperature
                loss = self.criterion(
                    F.log_softmax(student_logits, dim=-1),
                    F.softmax(teacher_logits, dim=-1)
                ) * (self.temperature ** 2)

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
Back to Blog

관련 글

더 보기 »