import { isEqual } from 'lodash-es';
import { useDispatch } from 'react-redux';

import { getGraphProcess, isJobSubgraph, isSubGraphOrMetaGem } from '../../Parser/bindings';
import { GemSpec } from '../../Parser/types';
import { actionTypes } from '../../redux/action-types';
import { UpOrDownStreamProcessIDsType } from '../../redux/types';
import { useSpecs } from '../graph/gem-specs';
import { useInterimRunId } from '../interims/useInterimRunId';
import { useCurrentDialogGraph, useOriginalGraph } from '../useOriginalGraph';
import type { GraphType } from './types';

export const useStaleDownStreamProcesses = (isSQl?: boolean) => {
  const { rootGraph, rootPath, graph } = useOriginalGraph();
  const { graph: dialogGraph, processId, graphPath } = useCurrentDialogGraph();
  const dialogProcess = dialogGraph.processes[processId];
  const specs = useSpecs();
  const currentGraphModel = isSQl ? graph : rootGraph;

  const dispatch = useDispatch();

  const { interimRunId: _interimRunId, parentInterimRunId } = useInterimRunId();
  const interimRunId = _interimRunId || parentInterimRunId;
  const handleDownStreamStale = () => {
    const originalProcess = getGraphProcess(rootGraph, graphPath, processId, rootPath);
    const isProcessChanged = !isEqual(dialogProcess, originalProcess);
    if (isProcessChanged) {
      const downStreamIds = findUpOrDownStreamProcessAndIds({
        specs,
        graph: currentGraphModel,
        upOrDownStreamProcessIds: [{ processId, sourcePort: '', targetPort: '' }],
        inPortProcessId: processId
      });
      dispatch({
        type: actionTypes.staleDownStreamData,
        payload: downStreamIds.map((item) => ({ ...item, runId: interimRunId }))
      });
      return downStreamIds;
    }
    return [];
  };
  return { handleDownStreamStale };
};
export const findUpOrDownStreamProcessAndIds = ({
  specs,
  graph,
  upOrDownStreamProcessIds,
  inPortProcessId,
  isSubgraphChild = false,
  expandSubgraph = true,
  direction = 'downStream',
  includeSelf = true
}: {
  specs: GemSpec[];
  graph: GraphType;
  upOrDownStreamProcessIds: UpOrDownStreamProcessIDsType;
  inPortProcessId: string;
  isSubgraphChild?: boolean;
  expandSubgraph?: boolean;
  direction?: 'upStream' | 'downStream';
  includeSelf?: boolean;
}): UpOrDownStreamProcessIDsType => {
  const connections = graph.connections;
  const processConnections = connections.filter((connection) =>
    direction === 'downStream' ? connection.source === inPortProcessId : connection.target === inPortProcessId
  );

  if (!upOrDownStreamProcessIds.find(({ processId }) => processId === inPortProcessId) && includeSelf) {
    upOrDownStreamProcessIds.push({
      processId: inPortProcessId,
      sourcePort: '',
      targetPort: ''
    });
  }

  for (const processConnection of processConnections) {
    const process = graph.processes[direction === 'downStream' ? processConnection.target : processConnection.source];

    if (!process) continue;

    // avoid revisting the same processes
    if (upOrDownStreamProcessIds.find(({ processId }) => process?.id === processId)) continue;

    upOrDownStreamProcessIds.push({
      processId: process.id,
      sourcePort: processConnection.sourcePort,
      targetPort: processConnection.targetPort
    });

    const isSubgraph = isSubGraphOrMetaGem(process, specs) || isJobSubgraph(process);
    if (isSubgraph && !isSubgraphChild && expandSubgraph) {
      findUpOrDownStreamProcessAndIds({
        specs,
        graph: process as GraphType,
        upOrDownStreamProcessIds: upOrDownStreamProcessIds,
        inPortProcessId: process.id,
        isSubgraphChild: true,
        expandSubgraph,
        direction
      });
    } else {
      findUpOrDownStreamProcessAndIds({
        specs,
        graph,
        upOrDownStreamProcessIds: upOrDownStreamProcessIds,
        inPortProcessId: process.id,
        isSubgraphChild: false,
        expandSubgraph,
        direction
      });
    }
  }

  return upOrDownStreamProcessIds;
};
