#!/usr/bin/env python
"""Tests for the registry flows."""

import os

from absl import app

from grr_response_core.lib import rdfvalue
from grr_response_core.lib.rdfvalues import file_finder as rdf_file_finder
from grr_response_core.lib.rdfvalues import mig_client
from grr_response_core.lib.rdfvalues import paths as rdf_paths
from grr_response_proto import knowledge_base_pb2
from grr_response_server import artifact
from grr_response_server import data_store
from grr_response_server.flows.general import registry
from grr_response_server.flows.general import registry_finder
from grr_response_server.flows.general import transfer
from grr.test_lib import action_mocks
from grr.test_lib import flow_test_lib
from grr.test_lib import test_lib
from grr.test_lib import vfs_test_lib


class RegistryFlowTest(flow_test_lib.FlowTestsBaseclass):

  def setUp(self):
    super().setUp()
    vfs_overrider = vfs_test_lib.VFSOverrider(
        rdf_paths.PathSpec.PathType.REGISTRY,
        vfs_test_lib.FakeRegistryVFSHandler,
    )
    vfs_overrider.Start()
    self.addCleanup(vfs_overrider.Stop)


class TestFakeRegistryFinderFlow(RegistryFlowTest):
  """Tests for the RegistryFinder flow."""

  runkey = "HKEY_USERS/S-1-5-20/Software/Microsoft/Windows/CurrentVersion/Run/*"

  def RunFlow(self, client_id, keys_paths=None, conditions=None):
    if keys_paths is None:
      keys_paths = [
          "HKEY_USERS/S-1-5-20/Software/Microsoft/Windows/CurrentVersion/Run/*"
      ]
    if conditions is None:
      conditions = []

    client_mock = action_mocks.ClientFileFinderWithVFS()

    session_id = flow_test_lib.StartAndRunFlow(
        registry_finder.RegistryFinder,
        client_mock,
        client_id=client_id,
        flow_args=registry_finder.RegistryFinderArgs(
            keys_paths=keys_paths,
            conditions=conditions,
        ),
        creator=self.test_username,
    )

    return session_id

  def testFindsNothingIfNothingMatchesTheGlob(self):
    client_id = self.SetupClient(0)
    session_id = self.RunFlow(
        client_id,
        [
            "HKEY_USERS/S-1-5-20/Software/Microsoft/"
            "Windows/CurrentVersion/Run/NonMatch*"
        ],
    )
    self.assertFalse(flow_test_lib.GetFlowResults(client_id, session_id))

  def testFindsKeysWithSingleGlobWithoutConditions(self):
    client_id = self.SetupClient(0)
    session_id = self.RunFlow(
        client_id,
        ["HKEY_USERS/S-1-5-20/Software/Microsoft/Windows/CurrentVersion/Run/*"],
    )

    results = flow_test_lib.GetFlowResults(client_id, session_id)
    self.assertLen(results, 2)
    # We expect Sidebar and MctAdmin keys here (see
    # test_data/client_fixture.py).
    basenames = [os.path.basename(r.stat_entry.pathspec.path) for r in results]
    self.assertCountEqual(basenames, ["Sidebar", "MctAdmin"])

  def testFindsKeysWithTwoGlobsWithoutConditions(self):
    client_id = self.SetupClient(0)
    session_id = self.RunFlow(
        client_id,
        [
            (
                "HKEY_USERS/S-1-5-20/Software/Microsoft/"
                "Windows/CurrentVersion/Run/Side*"
            ),
            (
                "HKEY_USERS/S-1-5-20/Software/Microsoft/"
                "Windows/CurrentVersion/Run/Mct*"
            ),
        ],
    )

    results = flow_test_lib.GetFlowResults(client_id, session_id)
    self.assertLen(results, 2)
    # We expect Sidebar and MctAdmin keys here (see
    # test_data/client_fixture.py).
    basenames = [os.path.basename(r.stat_entry.pathspec.path) for r in results]
    self.assertCountEqual(basenames, ["Sidebar", "MctAdmin"])

  def testFindsKeyWithInterpolatedGlobWithoutConditions(self):
    user = knowledge_base_pb2.User(username="foo", sid="S-1-5-20")
    client_id = self.SetupClient(0, users=[user])

    session_id = self.RunFlow(
        client_id,
        [
            "HKEY_USERS/%%users.sid%%/Software/Microsoft/Windows/"
            "CurrentVersion/*"
        ],
    )

    results = flow_test_lib.GetFlowResults(client_id, session_id)
    self.assertLen(results, 1)

    key = "/HKEY_USERS/S-1-5-20/Software/Microsoft/Windows/CurrentVersion/Run"

    self.assertEqual(results[0].stat_entry.pathspec.CollapsePath(), key)
    self.assertEqual(results[0].stat_entry.pathspec.path, key)
    self.assertEqual(
        results[0].stat_entry.pathspec.pathtype,
        rdf_paths.PathSpec.PathType.REGISTRY,
    )

  def testFindsNothingIfNothingMatchesLiteralMatchCondition(self):
    vlm = rdf_file_finder.FileFinderContentsLiteralMatchCondition(
        bytes_before=10, bytes_after=10, literal=b"CanNotFindMe"
    )

    client_id = self.SetupClient(0)
    session_id = self.RunFlow(
        client_id,
        [self.runkey],
        [
            registry_finder.RegistryFinderCondition(
                condition_type=registry_finder.RegistryFinderCondition.Type.VALUE_LITERAL_MATCH,
                value_literal_match=vlm,
            )
        ],
    )
    self.assertFalse(flow_test_lib.GetFlowResults(client_id, session_id))

  def testFindsKeyIfItMatchesLiteralMatchCondition(self):
    vlm = rdf_file_finder.FileFinderContentsLiteralMatchCondition(
        bytes_before=10, bytes_after=10, literal=b"Windows Sidebar\\Sidebar.exe"
    )

    client_id = self.SetupClient(0)
    session_id = self.RunFlow(
        client_id,
        [self.runkey],
        [
            registry_finder.RegistryFinderCondition(
                condition_type=registry_finder.RegistryFinderCondition.Type.VALUE_LITERAL_MATCH,
                value_literal_match=vlm,
            )
        ],
    )

    results = flow_test_lib.GetFlowResults(client_id, session_id)
    self.assertLen(results, 1)
    self.assertLen(results[0].matches, 1)

    # The matching fragment is at offset 15 and bytes_before is 10. Hence,
    # the offset is 5.
    self.assertEqual(results[0].matches[0].offset, 5)
    self.assertEqual(
        results[0].matches[0].data,
        b"ramFiles%\\Windows Sidebar\\Sidebar.exe /autoRun",
    )

    self.assertEqual(
        results[0].stat_entry.pathspec.CollapsePath(),
        "/HKEY_USERS/S-1-5-20/Software/Microsoft/"
        "Windows/CurrentVersion/Run/Sidebar",
    )
    self.assertEqual(
        results[0].stat_entry.pathspec.pathtype,
        rdf_paths.PathSpec.PathType.REGISTRY,
    )

  def testFindsNothingIfRegexMatchesNothing(self):
    value_regex_match = rdf_file_finder.FileFinderContentsRegexMatchCondition(
        bytes_before=10, bytes_after=10, regex=b".*CanNotFindMe.*"
    )

    client_id = self.SetupClient(0)
    session_id = self.RunFlow(
        client_id,
        [self.runkey],
        [
            registry_finder.RegistryFinderCondition(
                condition_type=registry_finder.RegistryFinderCondition.Type.VALUE_REGEX_MATCH,
                value_regex_match=value_regex_match,
            )
        ],
    )
    self.assertFalse(flow_test_lib.GetFlowResults(client_id, session_id))

  def testFindsKeyIfItMatchesRegexMatchCondition(self):
    value_regex_match = rdf_file_finder.FileFinderContentsRegexMatchCondition(
        bytes_before=10, bytes_after=10, regex=b"Windows.+\\.exe"
    )

    client_id = self.SetupClient(0)
    session_id = self.RunFlow(
        client_id,
        [self.runkey],
        [
            registry_finder.RegistryFinderCondition(
                condition_type=registry_finder.RegistryFinderCondition.Type.VALUE_REGEX_MATCH,
                value_regex_match=value_regex_match,
            )
        ],
    )

    results = flow_test_lib.GetFlowResults(client_id, session_id)
    self.assertLen(results, 1)
    self.assertLen(results[0].matches, 1)

    # The matching fragment is at offset 15 and bytes_before is 10. Hence,
    # the offset is 5.
    self.assertEqual(results[0].matches[0].offset, 5)
    self.assertEqual(
        results[0].matches[0].data,
        b"ramFiles%\\Windows Sidebar\\Sidebar.exe /autoRun",
    )

    self.assertEqual(
        results[0].stat_entry.pathspec.CollapsePath(),
        "/HKEY_USERS/S-1-5-20/Software/Microsoft/Windows/"
        "CurrentVersion/Run/Sidebar",
    )
    self.assertEqual(
        results[0].stat_entry.pathspec.pathtype,
        rdf_paths.PathSpec.PathType.REGISTRY,
    )

  def testFindsNothingIfModiciationTimeConditionMatchesNothing(self):
    modification_time = rdf_file_finder.FileFinderModificationTimeCondition(
        min_last_modified_time=rdfvalue.RDFDatetime.FromSecondsSinceEpoch(0),
        max_last_modified_time=rdfvalue.RDFDatetime.FromSecondsSinceEpoch(1),
    )

    client_id = self.SetupClient(0)
    session_id = self.RunFlow(
        client_id,
        [self.runkey],
        [
            registry_finder.RegistryFinderCondition(
                condition_type=registry_finder.RegistryFinderCondition.Type.MODIFICATION_TIME,
                modification_time=modification_time,
            )
        ],
    )
    self.assertFalse(flow_test_lib.GetFlowResults(client_id, session_id))

  def testFindsKeysIfModificationTimeConditionMatches(self):
    modification_time = rdf_file_finder.FileFinderModificationTimeCondition(
        min_last_modified_time=rdfvalue.RDFDatetime.FromSecondsSinceEpoch(
            1247546054 - 1
        ),
        max_last_modified_time=rdfvalue.RDFDatetime.FromSecondsSinceEpoch(
            1247546054 + 1
        ),
    )

    client_id = self.SetupClient(0)
    session_id = self.RunFlow(
        client_id,
        [self.runkey],
        [
            registry_finder.RegistryFinderCondition(
                condition_type=registry_finder.RegistryFinderCondition.Type.MODIFICATION_TIME,
                modification_time=modification_time,
            )
        ],
    )

    results = flow_test_lib.GetFlowResults(client_id, session_id)
    self.assertLen(results, 2)
    # We expect Sidebar and MctAdmin keys here (see
    # test_data/client_fixture.py).
    basenames = [os.path.basename(r.stat_entry.pathspec.path) for r in results]
    self.assertCountEqual(basenames, ["Sidebar", "MctAdmin"])

  def testFindsKeyWithLiteralAndModificationTimeConditions(self):
    modification_time = rdf_file_finder.FileFinderModificationTimeCondition(
        min_last_modified_time=rdfvalue.RDFDatetime.FromSecondsSinceEpoch(
            1247546054 - 1
        ),
        max_last_modified_time=rdfvalue.RDFDatetime.FromSecondsSinceEpoch(
            1247546054 + 1
        ),
    )

    vlm = rdf_file_finder.FileFinderContentsLiteralMatchCondition(
        bytes_before=10, bytes_after=10, literal=b"Windows Sidebar\\Sidebar.exe"
    )

    client_id = self.SetupClient(0)
    session_id = self.RunFlow(
        client_id,
        [self.runkey],
        [
            registry_finder.RegistryFinderCondition(
                condition_type=registry_finder.RegistryFinderCondition.Type.MODIFICATION_TIME,
                modification_time=modification_time,
            ),
            registry_finder.RegistryFinderCondition(
                condition_type=registry_finder.RegistryFinderCondition.Type.VALUE_LITERAL_MATCH,
                value_literal_match=vlm,
            ),
        ],
    )

    results = flow_test_lib.GetFlowResults(client_id, session_id)
    self.assertLen(results, 1)
    # We expect Sidebar and MctAdmin keys here (see
    # test_data/client_fixture.py).
    self.assertEqual(
        results[0].stat_entry.pathspec.CollapsePath(),
        "/HKEY_USERS/S-1-5-20/Software/Microsoft/"
        "Windows/CurrentVersion/Run/Sidebar",
    )

  def testSizeCondition(self):
    client_id = self.SetupClient(0)
    # There are two values, one is 20 bytes, the other 53.
    session_id = self.RunFlow(
        client_id,
        [self.runkey],
        [
            registry_finder.RegistryFinderCondition(
                condition_type=registry_finder.RegistryFinderCondition.Type.SIZE,
                size=rdf_file_finder.FileFinderSizeCondition(min_file_size=50),
            )
        ],
    )
    results = flow_test_lib.GetFlowResults(client_id, session_id)
    self.assertLen(results, 1)
    self.assertGreater(results[0].stat_entry.st_size, 50)


class TestRegistryFlows(RegistryFlowTest):
  """Test the Run Key registry flows."""

  def testCollectRunKeyBinaries(self):
    """Read Run key from the client_fixtures to test parsing and storage."""
    client_id = self.SetupClient(0, system="Windows", os_version="6.2")

    with vfs_test_lib.VFSOverrider(
        rdf_paths.PathSpec.PathType.OS, vfs_test_lib.FakeFullVFSHandler
    ):

      client_mock = action_mocks.InterrogatedClient()

      # Get KB initialized
      session_id = flow_test_lib.StartAndRunFlow(
          artifact.KnowledgeBaseInitializationFlow,
          client_mock,
          client_id=client_id,
          creator=self.test_username,
      )

      kb = flow_test_lib.GetFlowResults(client_id, session_id)[0]

      client = data_store.REL_DB.ReadClientSnapshot(client_id)
      client.knowledge_base.CopyFrom(mig_client.ToProtoKnowledgeBase(kb))
      data_store.REL_DB.WriteClientSnapshot(client)

      with test_lib.Instrument(
          transfer.MultiGetFile, "Start"
      ) as getfile_instrument:
        # Run the flow in the emulated way.
        flow_test_lib.StartAndRunFlow(
            registry.CollectRunKeyBinaries,
            client_mock,
            client_id=client_id,
            creator=self.test_username,
        )

        # Check MultiGetFile got called for our runkey file
        download_requested = False
        for pathspec in getfile_instrument.args[0][0].args.pathspecs:
          if pathspec.path == "C:\\Windows\\TEMP\\A.exe":
            download_requested = True
        self.assertTrue(download_requested)


def main(argv):
  # Run the full test suite
  test_lib.main(argv)


if __name__ == "__main__":
  app.run(main)
