import {
  And,
  Constant,
  Expression,
  f,
  From,
  If,
  M,
  Markup,
  Now,
  Ty,
  UI,
  UnionAll,
  Values,
} from '@cotera/era';
import { SYSTEM_TABLES } from '../../runtime/system-tables';
import {
  EVENT_CURSORS,
  ENTITY_DEFINITIONS,
  MANIFEST_EVENT_STREAM_SINKS,
} from '../../runtime/vars';
import { SinkTooling } from '../streams';

const ALL = 'ALL';

const AvailableEntityIds = UnionAll([
  Values([{ name: ALL }]),
  EVENT_CURSORS.select((t) => ({ name: t.attr('entity_id') })),
  ENTITY_DEFINITIONS.select((t) => t.pick('name')),
]);

const entityEvents = ({ entityId }: { entityId: Expression }) =>
  From(SYSTEM_TABLES.entity_events).where((t) =>
    If(entityId.eq(ALL), {
      then: true,
      else: t.attr('entity_id').eq(entityId),
    })
  );

const cursorForEntity = (p: { entityId: Expression }) =>
  EVENT_CURSORS.where((t) =>
    If(p.entityId.eq(ALL), {
      then: true,
      else: t.attr('entity_id').eq(p.entityId),
    })
  );

const streamEvents = (p: { entityId: Expression }) =>
  From(SYSTEM_TABLES.entity_events).where((t) =>
    If(p.entityId.eq(ALL), {
      then: true,
      else: t.attr('entity_id').eq(p.entityId),
    })
  );

const EntityStreamAmounts = ({ entityId }: { entityId: Expression }) =>
  entityEvents({ entityId }).sqlWhere`"detected_at_time" > ${Now().dateSub(
    'days',
    14
  )}`
    .countBy((t) => ({
      ...t.pick('event_stream_id'),
      day: t.attr('detected_at_time').dateTrunc('day'),
    }))
    .chart.LineChart(
      (t) => ({
        x: t.attr('day'),
        y: t.attr('COUNT'),
        category: t.attr('event_stream_id'),
      }),
      {
        axis: { x: { scale: 'day' } },
        title: 'Events streams counts in the past two weeks',
      }
    );

const StreamAnalysis = ({ entityId }: { entityId: Expression }) => {
  return streamEvents({ entityId })
    .innerJoin(cursorForEntity({ entityId }), (event, cursor) => ({
      on: And(
        cursor.attr('event_stream_id').eq(event.attr('event_stream_id')),
        cursor.attr('entity_id').eq(event.attr('entity_id'))
      ),
      select: {
        event_stream_id: event.attr('event_stream_id'),
        processed: event
          .attr('detected_at_time')
          .lte(cursor.attr('detected_at_time')),
      },
    }))
    .countBy((t) => t.pick('processed', 'event_stream_id'));
};

export const EventsObservability: Markup = Markup.fromShorthand(
  UI.State(
    {
      entityId: Constant('ALL', { ty: 'string' }),
      tab: Constant('Cursors', { ty: Ty.e(['Cursors', 'Advanced']) }),
    },
    ({ entityId, tab }) => [
      UI.Header({
        title: 'Events',
        caption: 'Cotera Warehouse Driven Eventing',
      }),
      entityId.PickFrom(AvailableEntityIds, (t) => t.attr('name'), {
        display: 'tab-selector',
      }),
      UI.Block([EntityStreamAmounts({ entityId })], {
        title: f`Entity: ${entityId}`,
      }),
      UI.Block(
        [
          SinkTooling(cursorForEntity({ entityId }))
            .innerJoin(
              StreamAnalysis({
                entityId,
              }),
              (t, s) => ({
                on: t.attr('event_stream_id').eq(s.attr('event_stream_id')),
                select: {
                  ...t.star(),
                  processed: s.attr('processed'),
                  events: s.attr('COUNT'),
                },
              })
            )
            .chart.BarChart(
              (t) => ({
                y: t.attr('events'),
                x: t.attr('event_stream_id'),
                category: If(t.attr('processed'), {
                  then: 'Processed',
                  else: 'Pending',
                }),
              }),
              {
                type: 'grouped',
                direction: 'horizontal',
              }
            ),
          UI.TabsV2(tab, [
            M.muCase([
              {
                when: tab.eq('Cursors'),
                then: SinkTooling(cursorForEntity({ entityId }))
                  .rightJoin(MANIFEST_EVENT_STREAM_SINKS, (cursor, sink) => ({
                    on: cursor.attr('sink_id').eq(sink.attr('sink_id')),
                    select: {
                      ...cursor.star(),
                      entity_id: cursor
                        .attr('entity_id')
                        .coalesce(sink.attr('entity_id')),
                      sink_id: cursor
                        .attr('sink_id')
                        .coalesce(sink.attr('sink_id')),
                      event_stream_id: cursor
                        .attr('event_stream_id')
                        .coalesce(sink.attr('event_stream_id')),
                    },
                  }))
                  .select((t) => t.star(), { distinct: true }),
              },
              {
                when: tab.eq('Advanced'),
                then: SinkTooling(
                  cursorForEntity({ entityId }).rightJoin(
                    MANIFEST_EVENT_STREAM_SINKS,
                    (cursor, manifest) => ({
                      on: cursor
                        .attr('event_stream_id')
                        .eq(manifest.attr('event_stream_id')),
                      select: {
                        ...cursor.pick('paused_at', 'cotera_stable_id'),
                        entity_id: cursor
                          .attr('entity_id')
                          .coalesce(manifest.attr('entity_id')),
                        sink_id: cursor
                          .attr('sink_id')
                          .coalesce(manifest.attr('sink_id')),
                        event_stream_id: cursor
                          .attr('event_stream_id')
                          .coalesce(manifest.attr('event_stream_id')),
                        detected_at_time: cursor.attr('detected_at_time'),
                      },
                    })
                  )
                ).leftJoin(
                  StreamAnalysis({
                    entityId,
                  }),
                  (t, s) => ({
                    on: t.attr('event_stream_id').eq(s.attr('event_stream_id')),
                    select: {
                      ...t.star(),
                      processed: s.attr('processed'),
                      events: s.attr('COUNT'),
                    },
                  })
                ),
              },
            ]),
          ]),
        ],
        { title: 'Event Stream Cursors' }
      ),
    ]
  )
);
