import { AST } from '../ast';
import { Assert } from '../utils';
import { evalExprIR } from './eval-expr-ir';
import { Ty } from '../ty';
import { ExprIR } from '../ast/expr';
import hash from 'object-hash';
import type { DB } from './eval-ddl';
import _ from 'lodash';
import { checkRel } from '../type-checker/check-rel';
import { TyStackTrace } from '../type-checker';
import { IRChildren } from '../ast/base';
import { KeyVal } from './kv';

const ALWAYS_NULL_PROXY: Ty.Row = new Proxy({}, { get: () => null });

type Stream = Generator<Ty.Row>;
type Transform = (x: Stream) => Stream;

const EMPTY_DB: DB = {};

export const evalRelIR = (rel: AST.RelIR, opts?: { db?: DB }) =>
  doEvalRelIR(rel, { db: opts?.db ?? EMPTY_DB });

type ExecRel<T extends AST.RelIR> = (rel: T, opts: { db: DB }) => Stream;

const doEvalRelIR: ExecRel<AST.RelIR> = function* (
  rel: AST.RelIR,
  opts: { db: DB }
): Generator<Ty.Row> {
  const { db = EMPTY_DB } = opts ?? {};

  const { t } = rel;

  switch (t) {
    case 'values': {
      for (const row of rel.values) {
        yield row;
      }
      break;
    }
    case 'generate-series': {
      yield* doGenerateSeries(rel, opts);
      break;
    }
    case 'select': {
      yield* doSelect(rel, { db });
      break;
    }
    case 'union': {
      yield* doUnion(rel, opts);
      break;
    }
    case 'table': {
      yield* doTable(rel, opts);
      break;
    }
    case 'file': {
      yield* doFile(rel, opts);
      break;
    }
    case 'aggregate':
      yield* doAggregate(rel, opts);
      break;
    case 'join':
      yield* doJoin(rel, opts);
      break;
    case 'information-schema':
      yield* doInfoSchema(rel, opts);
      break;
    default: {
      return Assert.unreachable(rel);
    }
  }
};

const doInfoSchema: ExecRel<AST._InformationSchema> = function* (rel, opts) {
  for (const [table_schema, schema] of Object.entries(opts.db)) {
    for (const [table_name, node] of Object.entries(schema)) {
      switch (rel.type) {
        case 'tables':
          yield { table_schema, table_name } as Ty.Row;
          break;
        case 'columns': {
          const check = checkRel(node.data);
          Assert.assert(!(check instanceof TyStackTrace));
          for (const [column_name, col_ty] of Object.entries(
            check.attributes
          )) {
            yield {
              table_name,
              table_schema,
              column_name,
              is_nullable: col_ty.nullable ? 'yes' : 'no',
              data_type: Ty.displayTy(col_ty),
            } as Ty.Row;
          }
          break;
        }
        default:
          return Assert.unreachable(rel.type);
      }
    }
  }
};

const doJoin: ExecRel<AST._Join<IRChildren>> = function* (rel, opts) {
  const lhs = [...doEvalRelIR(rel.sources.left, opts)];
  const rhs = [...doEvalRelIR(rel.sources.right, opts)];

  const cond = (sources: { left: Ty.Row; right: Ty.Row }): boolean =>
    evalExprIR(rel.condition, { sources }) === true;

  const selection = (sources: { left: Ty.Row; right: Ty.Row }): Ty.Row =>
    _.mapValues(rel.selection, (expr) =>
      evalExprIR(expr, {
        sources,
      })
    );

  switch (rel.how) {
    case 'inner': {
      for (const left of lhs) {
        for (const right of rhs) {
          if (cond({ left, right })) {
            yield selection({ left, right });
          }
        }
      }
      break;
    }
    case 'left': {
      for (const left of lhs) {
        const matches = rhs.filter((right) => cond({ left, right }));

        if (matches.length === 0) {
          yield selection({ left, right: ALWAYS_NULL_PROXY });
        } else {
          for (const right of matches) {
            yield selection({ left, right });
          }
        }
      }
      break;
    }
    case 'right': {
      for (const right of rhs) {
        const matches = lhs.filter((left) => cond({ left, right }));

        if (matches.length === 0) {
          yield selection({ left: ALWAYS_NULL_PROXY, right });
        } else {
          for (const left of matches) {
            yield selection({ left, right });
          }
        }
      }
      break;
    }
    default: {
      return Assert.unreachable(rel.how);
    }
  }
};

const doAggregate: ExecRel<AST._Aggregate<IRChildren>> = function* (rel, opts) {
  const {
    sources: { from },
    groupedAttributes,
    selection,
  } = rel;
  const rows = doEvalRelIR(from, opts);

  const groupValues = new KeyVal<Ty.Row[]>();

  for (const row of rows) {
    const group = _.sortBy(
      Object.entries(_.pick(row, groupedAttributes)),
      ([name, _]) => name
    );

    groupValues.insertWithPrevious(group, (existing = []) => [
      ...existing,
      row,
    ]);
  }

  for (const { key, val: rows } of groupValues) {
    const from = Object.fromEntries(key);

    yield _.mapValues(selection, (ir) =>
      evalExprIR(ir, { sources: { from }, rows })
    );
  }
};

const doFile: ExecRel<AST._FileContract> = () => {
  throw new Error('File Sources Not Supported');
};

const doTable: ExecRel<AST._TableContract> = function* (rel, { db }) {
  const schema = db[rel.schema];
  if (!schema) {
    throw new Error(
      `Schema "${rel.schema}" not found, available schema: ${Object.keys(db)
        .map((x) => `"${x}"`)
        .join(', ')}`
    );
  }

  const table = schema[rel.name];
  if (!table) {
    throw new Error(
      `Table "${rel.schema}"."${
        rel.name
      }" not found, available tables in schema: ${Object.keys(schema)
        .map((x) => `"${x}"`)
        .join(', ')}`
    );
  }
  yield* doEvalRelIR(table.data, { db });
};

const doUnion: ExecRel<AST._Union<IRChildren>> = (rel, opts) => {
  const l = doEvalRelIR(rel.sources.left, opts);
  const r = doEvalRelIR(rel.sources.right, opts);

  const combined = function* () {
    yield* l;
    yield* r;
  };

  return pipe(combined(), [doDistinct(!rel.all)]);
};

const doGenerateSeries: ExecRel<AST._GenerateSeries<IRChildren>> = function* (
  rel,
  _opts
) {
  const start = evalExprIR(rel.start, { sources: {} });
  const stop = evalExprIR(rel.stop, { sources: {} });

  Assert.assert(typeof start === 'number' && typeof stop === 'number');

  for (let n = start; n <= stop; n++) {
    yield { n };
  }
};

const doSelect: ExecRel<AST._Select<IRChildren>> = function* (rel, opts) {
  const source = doEvalRelIR(rel.sources.from, opts);
  yield* pipe(source, [
    rel.condition !== null ? doWhereClause(rel.condition) : (x) => x,
    doProject(rel.selection),
    doOrderBy(rel.orderBys),
    doDistinct(rel.distinct),
    doLimitOffset({ limit: rel.limit, offest: rel.offset }),
  ]);
};

const doOrderBy = (orderBys: readonly AST.OrderBy<IRChildren>[]): Transform =>
  function* (stream) {
    if (orderBys.length === 0) {
      yield* stream;
    } else {
      const cmp = (a: Ty.Row, b: Ty.Row): number => {
        for (const { expr, direction } of orderBys) {
          const aVal = evalExprIR(expr, { sources: { from: a } });
          const bVal = evalExprIR(expr, { sources: { from: b } });

          if (aVal === null && bVal === null) {
            continue;
          }

          if (aVal === null) {
            return direction === 'asc' ? 1 : -1;
          }

          if (bVal === null) {
            return direction === 'asc' ? -1 : 1;
          }

          const aGteB = aVal >= bVal;
          const bGteA = bVal >= aVal;

          if (aGteB && bGteA) {
            return 0;
          }

          if (aGteB) {
            return direction === 'asc' ? 1 : -1;
          }

          if (bGteA) {
            return direction === 'asc' ? -1 : 1;
          }
        }

        return 0;
      };

      for (const row of [...stream].sort(cmp)) {
        yield row;
      }
    }
  };

const doDistinct = (distinct: boolean): Transform =>
  function* (stream) {
    if (!distinct) {
      yield* stream;
    } else {
      const seen: Set<string> = new Set();
      for (const row of stream) {
        const h = hash(row, { algorithm: 'passthrough' });
        if (seen.has(h)) {
          continue;
        } else {
          yield row;
          seen.add(h);
        }
      }
    }
  };

const doProject = (projection: Record<string, AST.ExprIR>): Transform =>
  function* (stream) {
    for (const row of stream) {
      yield _.mapValues(projection, (proj) => {
        return evalExprIR(proj, { sources: { from: row } });
      });
    }
  };

const doWhereClause = (condition: ExprIR): Transform =>
  function* (stream: Stream): Stream {
    for (const row of stream) {
      const matches =
        evalExprIR(condition, { sources: { from: row } }) === true;

      if (matches) {
        yield row;
      }
    }
  };

const doLimitOffset = (opts: {
  limit: number | null;
  offest: number | null;
}): Transform =>
  function* (stream: Stream): Stream {
    let offset = opts.offest ?? 0;
    let limit = opts.limit ?? Infinity;

    for (const row of stream) {
      if (offset > 0) {
        offset--;
        continue;
      }

      if (limit > 0) {
        yield row;
        limit--;
      } else {
        return;
      }
    }
  };

const pipe = (source: Stream, transforms: Transform[]): Stream =>
  transforms.reduce((stream, next) => next(stream), source);
