plaso-rubanetra/plaso/frontend/preg.py
2020-04-06 18:48:34 +02:00

2162 lines
71 KiB
Python
Executable File

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright 2013 The Plaso Project Authors.
# Please see the AUTHORS file for details on individual authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Parse your Windows Registry files using preg.
preg is a simple Windows Registry parser using the plaso Registry plugins and
image parsing capabilities. It uses the back-end libraries of plaso to read
raw image files and extract Registry files from VSS and restore points and then
runs the Registry plugins of plaso against the Registry hive and presents it
in a textual format.
"""
import argparse
import binascii
import logging
import os
import re
import sys
import textwrap
from dfvfs.helpers import file_system_searcher
from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.path import factory as path_spec_factory
from dfvfs.resolver import resolver as path_spec_resolver
try:
# Support version 1.X of IPython.
# pylint: disable=no-name-in-module
from IPython.terminal.embed import InteractiveShellEmbed
except ImportError:
# pylint: disable=no-name-in-module
from IPython.frontend.terminal.embed import InteractiveShellEmbed
import IPython
from IPython.config.loader import Config
from IPython.core import magic
import pysmdev
from plaso.artifacts import knowledge_base
from plaso.engine import queue
from plaso.engine import single_process
# Import the winreg formatter to register it, adding the option
# to print event objects using the default formatter.
# pylint: disable=unused-import
from plaso.formatters import winreg as winreg_formatter
from plaso.frontend import frontend
from plaso.frontend import utils as frontend_utils
from plaso.lib import errors
from plaso.lib import event
from plaso.lib import eventdata
from plaso.lib import timelib
from plaso.parsers import context as parsers_context
from plaso.parsers import manager as parsers_manager
from plaso.parsers import winreg as winreg_parser
from plaso.parsers import winreg_plugins # pylint: disable=unused-import
from plaso.preprocessors import interface as preprocess_interface
from plaso.preprocessors import manager as preprocess_manager
from plaso.winreg import cache
from plaso.winreg import path_expander as winreg_path_expander
from plaso.winreg import winregistry
# Older versions of IPython don't have a version_info attribute.
if getattr(IPython, 'version_info', (0, 0, 0)) < (1, 2, 1):
raise ImportWarning(
'Preg requires at least IPython version 1.2.1.')
class ConsoleConfig(object):
"""Class that contains functions to configure console actions."""
@classmethod
def GetConfig(cls):
"""Retrieves the iPython config.
Returns:
The IPython config object (instance of
IPython.terminal.embed.InteractiveShellEmbed)
"""
try:
# The "get_ipython" function does not exist except within an IPython
# session.
return get_ipython() # pylint: disable=undefined-variable
except NameError:
return Config()
@classmethod
def SetPrompt(
cls, hive_path=None, config=None, prepend_string=None):
"""Sets the prompt string on the console.
Args:
hive_path: The hive name or path as a string, this is optional name or
location of the loaded hive. If not defined the name is derived
from a default string.
config: The IPython configuration object (instance of
IPython.terminal.embed.InteractiveShellEmbed), this is optional
and is automatically derived if not used.
prepend_string: An optional string that can be injected into the prompt
just prior to the command count.
"""
if hive_path is None:
path_string = u'Unknown hive loaded'
else:
path_string = hive_path
prompt_strings = [
r'[{color.LightBlue}\T{color.Normal}]',
r'{color.LightPurple} ',
path_string,
r'\n{color.Normal}']
if prepend_string is not None:
prompt_strings.append(u'{0:s} '.format(prepend_string))
prompt_strings.append(r'[{color.Red}\#{color.Normal}] \$ ')
if config is None:
ipython_config = cls.GetConfig()
else:
ipython_config = config
try:
ipython_config.PromptManager.in_template = r''.join(prompt_strings)
except AttributeError:
ipython_config.prompt_manager.in_template = r''.join(prompt_strings)
class PregCache(object):
"""Cache storage used for iPython and other aspects of preg."""
events_from_last_parse = []
knowledge_base_object = knowledge_base.KnowledgeBase()
# Parser context, used when parsing Registry keys.
parser_context = None
hive_storage = None
shell_helper = None
class PregEventObjectQueueConsumer(queue.EventObjectQueueConsumer):
"""Class that implements a list event object queue consumer."""
def __init__(self, event_queue):
"""Initializes the list event object queue consumer.
Args:
event_queue: the event object queue (instance of Queue).
"""
super(PregEventObjectQueueConsumer, self).__init__(event_queue)
self.event_objects = []
def _ConsumeEventObject(self, event_object, **unused_kwargs):
"""Consumes an event object callback for ConsumeEventObjects.
Args:
event_object: the event object (instance of EventObject).
"""
self.event_objects.append(event_object)
class PregFrontend(frontend.ExtractionFrontend):
"""Class that implements the preg front-end."""
# All Registry plugins start with "winreg_", thus the Preg library cuts that
# part of, both for display and matching. That way a plugin can be called by
# the second half of the name, eg: "userassist" instead of
# "winreg_userassist".
PLUGIN_UNIQUE_NAME_START = len('winreg_')
# Define the different run modes.
RUN_MODE_CONSOLE = 1
RUN_MODE_REG_FILE = 2
RUN_MODE_REG_PLUGIN = 3
RUN_MODE_REG_KEY = 4
def __init__(self, output_writer):
"""Initializes the front-end object."""
input_reader = frontend.StdinFrontendInputReader()
super(PregFrontend, self).__init__(input_reader, output_writer)
self._key_path = None
self._parse_restore_points = False
self._verbose_output = False
self.plugins = None
def GetListOfAllPlugins(self):
"""Returns information about the supported plugins."""
return_strings = []
# TODO: replace frontend_utils.FormatHeader by frontend function.
return_strings.append(frontend_utils.FormatHeader(u'Supported Plugins'))
all_plugins = parsers_manager.ParsersManager.GetWindowsRegistryPlugins()
return_strings.append(frontend_utils.FormatHeader(u'Key Plugins'))
for plugin_obj in all_plugins.GetAllKeyPlugins():
return_strings.append(frontend_utils.FormatOutputString(
plugin_obj.NAME[self.PLUGIN_UNIQUE_NAME_START:],
plugin_obj.DESCRIPTION))
return_strings.append(frontend_utils.FormatHeader(u'Value Plugins'))
for plugin_obj in all_plugins.GetAllValuePlugins():
return_strings.append(frontend_utils.FormatOutputString(
plugin_obj.NAME[self.PLUGIN_UNIQUE_NAME_START:],
plugin_obj.DESCRIPTION))
return u'\n'.join(return_strings)
def ParseHive(
self, hive_path_or_path_spec, hive_collectors, shell_helper,
key_paths=None, use_plugins=None, verbose=False):
"""Opens a hive file, and returns information about parsed keys.
This function takes a path to a hive and a list of collectors (or
none if the Registry file is passed to the tool).
The function then opens up the hive inside each collector and runs
the plugins defined (or all if no plugins are defined) against all
the keys supplied to it.
Args:
hive_path: Full path to the hive file in question.
hive_collectors: A list of collectors to use (instance of
dfvfs.helpers.file_system_searcher.FileSystemSearcher)
shell_helper: A helper object (instance of PregHelper).
key_paths: A list of Registry keys paths that are to be parsed.
use_plugins: A list of plugins used to parse the key, None if all
plugins should be used.
verbose: Print more verbose content, like hex dump of extracted events.
Returns:
A string containing extracted information.
"""
if isinstance(hive_path_or_path_spec, basestring):
hive_path_spec = None
hive_path = hive_path_or_path_spec
else:
hive_path_spec = hive_path_or_path_spec
hive_path = hive_path_spec.location
if key_paths is None:
key_paths = []
print_strings = []
for name, hive_collector in hive_collectors:
# Printing '*' 80 times.
print_strings.append(u'*' * 80)
print_strings.append(
u'{0:>15} : {1:s}{2:s}'.format(u'Hive File', hive_path, name))
if hive_path_spec:
current_hive = shell_helper.OpenHive(hive_path_spec, hive_collector)
else:
current_hive = shell_helper.OpenHive(hive_path, hive_collector)
if not current_hive:
continue
for key_path in key_paths:
key_texts = []
key_dict = {}
if current_hive.reg_cache:
key_dict.update(current_hive.reg_cache.attributes.items())
if PregCache.knowledge_base_object.pre_obj:
key_dict.update(
PregCache.knowledge_base_object.pre_obj.__dict__.items())
key = current_hive.GetKeyByPath(key_path)
key_texts.append(u'{0:>15} : {1:s}'.format(u'Key Name', key_path))
if not key:
key_texts.append(u'Unable to open key: {0:s}'.format(key_path))
if verbose:
print_strings.extend(key_texts)
continue
key_texts.append(
u'{0:>15} : {1:d}'.format(u'Subkeys', key.number_of_subkeys))
key_texts.append(u'{0:>15} : {1:d}'.format(
u'Values', key.number_of_values))
key_texts.append(u'')
if verbose:
key_texts.append(u'{0:-^80}'.format(u' SubKeys '))
for subkey in key.GetSubkeys():
key_texts.append(
u'{0:>15} : {1:s}'.format(u'Key Name', subkey.path))
key_texts.append(u'')
key_texts.append(u'{0:-^80}'.format(u' Plugins '))
output_string = ParseKey(
key=key, shell_helper=shell_helper, verbose=verbose,
use_plugins=use_plugins, hive_helper=current_hive)
key_texts.extend(output_string)
print_strings.extend(key_texts)
return u'\n'.join(print_strings)
def ParseOptions(self, options, source_option='source'):
"""Parses the options and initializes the front-end.
Args:
options: the command line arguments (instance of argparse.Namespace).
source_option: optional name of the source option. The default is source.
Raises:
BadConfigOption: if the options are invalid.
"""
if not options:
raise errors.BadConfigOption(u'Missing options.')
image = getattr(options, 'image', None)
regfile = getattr(options, 'regfile', None)
if not image and not regfile:
raise errors.BadConfigOption(u'Not enough parameters to proceed.')
if image:
self._source_path = image
if regfile:
if not image and not os.path.isfile(regfile):
raise errors.BadConfigOption(
u'Registry file: {0:s} does not exist.'.format(regfile))
self._key_path = getattr(options, 'key', None)
self._parse_restore_points = getattr(options, 'restore_points', False)
self._verbose_output = getattr(options, 'verbose', False)
self.plugins = parsers_manager.ParsersManager.GetWindowsRegistryPlugins()
if image:
file_to_check = image
else:
file_to_check = regfile
is_file, reason = PathExists(file_to_check)
if not is_file:
raise errors.BadConfigOption(
u'Unable to read the input file with error: {0:s}'.format(reason))
if getattr(options, 'console', False):
self.run_mode = self.RUN_MODE_CONSOLE
elif getattr(options, 'key', u'') and regfile:
self.run_mode = self.RUN_MODE_REG_KEY
elif getattr(options, 'plugin_names', u''):
self.run_mode = self.RUN_MODE_REG_PLUGIN
elif regfile:
self.run_mode = self.RUN_MODE_REG_PLUGIN
else:
raise errors.BadConfigOption(
u'Incorrect usage. You\'ll need to define the path of either '
u'a storage media image or a Windows Registry file.')
def _ExpandKeysRedirect(self, keys):
"""Expands a list of Registry key paths with their redirect equivalents.
Args:
keys: a list of Windows Registry key paths.
"""
for key in keys:
if key.startswith('\\Software') and 'Wow6432Node' not in key:
_, first, second = key.partition('\\Software')
keys.append(u'{0:s}\\Wow6432Node{1:s}'.format(first, second))
# TODO: clean up this function as part of dfvfs find integration.
# TODO: a duplicate of this function exists in class: WinRegistryPreprocess
# method: GetValue; merge them.
def _FindRegistryPaths(self, searcher, pattern):
"""Return a list of Windows Registry file paths.
Args:
searcher: The file system searcher object (instance of
dfvfs.FileSystemSearcher).
pattern: The pattern to find.
"""
# TODO: optimize this in one find.
hive_paths = []
file_path, _, file_name = pattern.rpartition(u'/')
# The path is split in segments to make it path segement separator
# independent (and thus platform independent).
path_segments = file_path.split(u'/')
if not path_segments[0]:
path_segments = path_segments[1:]
find_spec = file_system_searcher.FindSpec(
location_regex=path_segments, case_sensitive=False)
path_specs = list(searcher.Find(find_specs=[find_spec]))
if not path_specs:
logging.debug(u'Directory: {0:s} not found'.format(file_path))
return hive_paths
for path_spec in path_specs:
directory_location = getattr(path_spec, 'location', None)
if not directory_location:
raise errors.PreProcessFail(
u'Missing directory location for: {0:s}'.format(file_path))
# The path is split in segments to make it path segment separator
# independent (and thus platform independent).
path_segments = searcher.SplitPath(directory_location)
path_segments.append(file_name)
find_spec = file_system_searcher.FindSpec(
location_regex=path_segments, case_sensitive=False)
fh_path_specs = list(searcher.Find(find_specs=[find_spec]))
if not fh_path_specs:
logging.debug(u'File: {0:s} not found in directory: {1:s}'.format(
file_name, directory_location))
continue
hive_paths.extend(fh_path_specs)
return hive_paths
def _GetRegistryFilePaths(self, plugin_name=None, registry_type=None):
"""Returns a list of Registry paths from a configuration object.
Args:
plugin_name: optional string containing the name of the plugin or an empty
string or None for all the types. Defaults to None.
registry_type: optional Registry type string. None by default.
Returns:
A list of path names for registry files.
"""
if self._parse_restore_points:
restore_path = u'/System Volume Information/_restor.+/RP[0-9]+/snapshot/'
else:
restore_path = u''
if registry_type:
types = [registry_type]
else:
types = self._GetRegistryTypes(plugin_name)
# Gather the Registry files to fetch.
paths = []
for reg_type in types:
if reg_type == 'NTUSER':
paths.append('/Documents And Settings/.+/NTUSER.DAT')
paths.append('/Users/.+/NTUSER.DAT')
if restore_path:
paths.append('{0:s}/_REGISTRY_USER_NTUSER.+'.format(restore_path))
elif reg_type == 'SOFTWARE':
paths.append('{sysregistry}/SOFTWARE')
if restore_path:
paths.append('{0:s}/_REGISTRY_MACHINE_SOFTWARE'.format(restore_path))
elif reg_type == 'SYSTEM':
paths.append('{sysregistry}/SYSTEM')
if restore_path:
paths.append('{0:s}/_REGISTRY_MACHINE_SYSTEM'.format(restore_path))
elif reg_type == 'SECURITY':
paths.append('{sysregistry}/SECURITY')
if restore_path:
paths.append('{0:s}/_REGISTRY_MACHINE_SECURITY'.format(restore_path))
elif reg_type == 'USRCLASS':
paths.append('/Users/.+/AppData/Local/Microsoft/Windows/UsrClass.dat')
elif reg_type == 'SAM':
paths.append('{sysregistry}/SAM')
if restore_path:
paths.append('{0:s}/_REGISTRY_MACHINE_SAM'.format(restore_path))
# Expand all the paths.
expanded_paths = []
expander = winreg_path_expander.WinRegistryKeyPathExpander()
for path in paths:
try:
expanded_paths.append(expander.ExpandPath(
path, pre_obj=PregCache.knowledge_base_object.pre_obj))
except KeyError as exception:
logging.error(u'Unable to expand keys with error: {0:s}'.format(
exception))
return expanded_paths
def _GetRegistryKeysFromHive(self, hive_helper, parser_context):
"""Retrieves a list of all key plugins for a given Registry type.
Args:
hive_helper: A hive object (instance of PregHiveHelper).
parser_context: A parser context object (instance of ParserContext).
Returns:
A list of Windows Registry keys.
"""
keys = []
if not hive_helper:
return
for key_plugin_cls in self.plugins.GetAllKeyPlugins():
temp_obj = key_plugin_cls(reg_cache=hive_helper.reg_cache)
if temp_obj.REG_TYPE == hive_helper.type:
temp_obj.ExpandKeys(parser_context)
keys.extend(temp_obj.expanded_keys)
return keys
def _GetRegistryPlugins(self, plugin_name):
"""Retrieves the Windows Registry plugins based on a filter string.
Args:
plugin_name: string containing the name of the plugin or an empty
string for all the plugins.
Returns:
A list of Windows Registry plugins.
"""
key_plugin_names = []
for plugin in self.plugins.GetAllKeyPlugins():
temp_obj = plugin(None)
key_plugin_names.append(temp_obj.plugin_name)
if not plugin_name:
return key_plugin_names
plugin_name = plugin_name.lower()
if not plugin_name.startswith('winreg'):
plugin_name = u'winreg_{0:s}'.format(plugin_name)
plugins_to_run = []
for key_plugin in key_plugin_names:
if plugin_name in key_plugin.lower():
plugins_to_run.append(key_plugin)
return plugins_to_run
def _GetRegistryTypes(self, plugin_name):
"""Retrieves the Windows Registry types based on a filter string.
Args:
plugin_name: string containing the name of the plugin or an empty
string for all the types.
Returns:
A list of Windows Registry types.
"""
reg_cache = cache.WinRegistryCache()
types = []
for plugin in self._GetRegistryPlugins(plugin_name):
for key_plugin_cls in self.plugins.GetAllKeyPlugins():
temp_obj = key_plugin_cls(reg_cache=reg_cache)
if plugin is temp_obj.plugin_name:
if temp_obj.REG_TYPE not in types:
types.append(temp_obj.REG_TYPE)
break
return types
def _GetSearchersForImage(self, volume_path_spec):
"""Retrieves the file systems searchers for searching the image.
Args:
volume_path_spec: The path specification of the volume containing
the file system (instance of dfvfs.PathSpec).
Returns:
A list of tuples containing the a string identifying the file system
searcher and a file system searcher object (instance of
dfvfs.FileSystemSearcher).
"""
searchers = []
path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, location=u'/',
parent=volume_path_spec)
file_system = path_spec_resolver.Resolver.OpenFileSystem(path_spec)
searcher = file_system_searcher.FileSystemSearcher(
file_system, volume_path_spec)
searchers.append((u'', searcher))
vss_stores = self._vss_stores
if not vss_stores:
return searchers
for store_index in vss_stores:
vss_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_VSHADOW, store_index=store_index - 1,
parent=volume_path_spec)
path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, location=u'/',
parent=vss_path_spec)
file_system = path_spec_resolver.Resolver.OpenFileSystem(path_spec)
searcher = file_system_searcher.FileSystemSearcher(
file_system, vss_path_spec)
searchers.append((
u':VSS Store {0:d}'.format(store_index), searcher))
return searchers
def GetHivesAndCollectors(
self, options, registry_types=None, plugin_names=None):
"""Returns a list of discovered Registry hives and collectors.
Args:
options: the command line arguments (instance of argparse.Namespace).
registry_types: an optional list of Registry types, eg: NTUSER, SAM, etc
that should be included. Defaults to None.
plugin_names: an optional list of strings containing the name of the
plugin(s) or an empty string for all the types. Defaults to
None.
Returns:
A tuple of hives and searchers, where hives is a list that contains
either a string (location of a Registry hive) or path specs (instance of
dfvfs.path.path_spec.PathSpec). The searchers is a list of tuples that
contain the name of the searcher and a searcher object (instance of
dfvfs.helpers.file_system_searcher.FileSystemSearcher) or None (if no
searcher is required).
Raises:
ValueError: If neither registry_types nor plugin name is passed
as a parameter.
BadConfigOption: If the source scanner is unable to complete due to
a source scanner error or back end error in dfvfs.
"""
if registry_types is None and plugin_names is None:
raise ValueError(
u'Missing Registry_types or plugin_name value.')
if plugin_names is None:
plugin_names = []
else:
plugin_names = [plugin_name.lower() for plugin_name in plugin_names]
# TODO: use non-preprocess collector with filter to collect hives.
# TODO: rewrite to always use collector or equiv.
if not self._source_path:
searchers = [(u'', None)]
return registry_types, searchers
try:
self.ScanSource(options)
except errors.SourceScannerError as exception:
raise errors.BadConfigOption((
u'Unable to scan for a supported filesystem with error: {0:s}\n'
u'Most likely the image format is not supported by the '
u'tool.').format(exception))
searchers = self._GetSearchersForImage(self.GetSourcePathSpec().parent)
_, searcher = searchers[0]
# Run preprocessing on image.
platform = preprocess_interface.GuessOS(searcher)
preprocess_manager.PreprocessPluginsManager.RunPlugins(
platform, searcher, PregCache.knowledge_base_object)
# Create the keyword list if plugins are used.
plugins_list = parsers_manager.ParsersManager.GetWindowsRegistryPlugins()
if plugin_names:
if registry_types is None:
registry_types = []
for plugin_name in plugin_names:
if not plugin_name.startswith('winreg_'):
plugin_name = u'winreg_{0:s}'.format(plugin_name)
for plugin_cls in plugins_list.GetAllKeyPlugins():
if plugin_name == plugin_cls.NAME.lower():
# If a plugin is available for every Registry type
# we need to make sure all Registry hives are included.
if plugin_cls.REG_TYPE == u'any':
for available_type in PregHiveHelper.REG_TYPES.iterkeys():
if available_type is u'Unknown':
continue
if available_type not in registry_types:
registry_types.append(available_type)
if plugin_cls.REG_TYPE not in registry_types:
registry_types.append(plugin_cls.REG_TYPE)
# Find all the Registry paths we need to check.
paths = []
if registry_types:
for registry_type in registry_types:
paths.extend(self._GetRegistryFilePaths(
registry_type=registry_type.upper()))
else:
for plugin_name in plugin_names:
paths.extend(self._GetRegistryFilePaths(plugin_name=plugin_name))
hives = []
for path in paths:
hives.extend(self._FindRegistryPaths(searcher, path))
return hives, searchers
def RunModeRegistryKey(self, options, plugin_names):
"""Run against a specific Registry key.
Finds and opens all Registry hives as configured in the configuration
object and tries to open the Registry key that is stored in the
configuration object for every detected hive file and parses it using
all available plugins.
Args:
options: the command line arguments (instance of argparse.Namespace).
plugin_names: a list of strings containing the name of the plugin(s) or
an empty list for all the types.
"""
regfile = getattr(options, 'regfile', u'')
hives, hive_collectors = self.GetHivesAndCollectors(
options, registry_types=[regfile],
plugin_names=plugin_names)
key_paths = [self._key_path]
# Expand the keys paths if there is a need (due to Windows redirect).
self._ExpandKeysRedirect(key_paths)
hive_storage = PregStorage()
shell_helper = PregHelper(options, self, hive_storage)
if hives is None:
hives = [regfile]
for hive in hives:
output_string = self.ParseHive(
hive, hive_collectors, shell_helper,
key_paths=key_paths, verbose=self._verbose_output)
self._output_writer.Write(output_string)
def RunModeRegistryPlugin(self, options, plugin_names):
"""Run against a set of Registry plugins.
Args:
options: the command line arguments (instance of argparse.Namespace).
plugin_names: a list of strings containing the name of the plugin(s) or
an empty string for all the types.
"""
# TODO: Add support for splitting the output to separate files based on
# each plugin name.
hives, hive_collectors = self.GetHivesAndCollectors(
options, plugin_names=plugin_names)
if hives is None:
hives = [getattr(options, 'regfile', None)]
plugin_list = []
for plugin_name in plugin_names:
plugin_list.extend(self._GetRegistryPlugins(plugin_name))
# In order to get all the Registry keys we need to expand
# them, but to do so we need to open up one hive so that we
# create the reg_cache object, which is necessary to fully
# expand all keys.
_, hive_collector = hive_collectors[0]
hive_storage = PregStorage()
shell_helper = PregHelper(options, self, hive_storage)
hive_helper = shell_helper.OpenHive(hives[0], hive_collector)
parser_context = shell_helper.BuildParserContext()
# Get all the appropriate keys from these plugins.
key_paths = self.plugins.GetExpandedKeyPaths(
parser_context, reg_cache=hive_helper.reg_cache,
plugin_names=plugin_list)
for hive in hives:
output_string = self.ParseHive(
hive, hive_collectors, shell_helper,
key_paths=key_paths, use_plugins=plugin_list,
verbose=self._verbose_output)
self._output_writer.Write(output_string)
def RunModeRegistryFile(self, options, regfile):
"""Run against a Registry file.
Finds and opens all Registry hives as configured in the configuration
object and determines the type of Registry file opened. Then it will
load up all the Registry plugins suitable for that particular Registry
file, find all Registry keys they are able to parse and run through
them, one by one.
Args:
options: the command line arguments (instance of argparse.Namespace).
regfile: A string containing either full path to the Registry hive or
a keyword to match it.
"""
# Get all the hives and collectors.
hives, hive_collectors = self.GetHivesAndCollectors(
options, registry_types=[regfile])
hive_storage = PregStorage()
shell_helper = PregHelper(options, self, hive_storage)
parser_context = shell_helper.BuildParserContext()
for hive in hives:
for collector_name, hive_collector in hive_collectors:
hive_helper = shell_helper.OpenHive(
hive, hive_collector=hive_collector,
hive_collector_name=collector_name)
hive_type = hive_helper.type
key_paths = self._GetRegistryKeysFromHive(hive_helper, parser_context)
self._ExpandKeysRedirect(key_paths)
plugins_to_run = self._GetRegistryPlugins(hive_type)
output_string = self.ParseHive(
hive, hive_collectors, shell_helper, key_paths=key_paths,
use_plugins=plugins_to_run, verbose=self._verbose_output)
self._output_writer.Write(output_string)
class PregHelper(object):
"""Class that defines various helper functions.
The purpose of this class is to bridge the plaso generated objects
with the IPython objects, making it easier to create magic classes
and provide additional helper functions to the IPython shell.
"""
def __init__(self, tool_options, tool_front_end, hive_storage):
"""Initialize the helper object.
Args:
tool_options: A configuration object.
tool_front_end: A front end object (instance of PregFrontend).
hive_storage: A hive storage object (instance of PregStorage).
"""
super(PregHelper, self).__init__()
self.tool_options = tool_options
self.tool_front_end = tool_front_end
self.hive_storage = hive_storage
def BuildParserContext(self, event_queue=None):
"""Build the parser object.
Args:
event_queue: An event queue object (instance of Queue). This is
optional and if a queue is not provided a default
one will be provided.
Returns:
A parser context object (instance of parsers_context.ParserContext).
"""
if event_queue is None:
event_queue = single_process.SingleProcessQueue()
event_queue_producer = queue.ItemQueueProducer(event_queue)
parse_error_queue = single_process.SingleProcessQueue()
parse_error_queue_producer = queue.ItemQueueProducer(parse_error_queue)
return parsers_context.ParserContext(
event_queue_producer, parse_error_queue_producer,
PregCache.knowledge_base_object)
def OpenHive(
self, filename_or_path_spec, hive_collector, hive_collector_name=None,
codepage='cp1252'):
"""Open a Registry hive based on a collector or a filename.
Args:
filename_or_path_spec: file path to the hive as a string or a path spec
object (instance of dfvfs.path.path_spec.PathSpec)
hive_collector: the collector to use (instance of
dfvfs.helpers.file_system_searcher.FileSystemSearcher)
hive_collector_name: optional string denoting the name of the collector
used. The default value is None.
codepage: the default codepage, default is cp1252.
Returns:
A hive helper object (instance of PregHiveHelper).
"""
PregCache.knowledge_base_object.SetDefaultCodepage(codepage)
if isinstance(filename_or_path_spec, basestring):
filename = filename_or_path_spec
path_spec = None
else:
filename = filename_or_path_spec.location
path_spec = filename_or_path_spec
if not hive_collector:
path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=filename)
file_entry = path_spec_resolver.Resolver.OpenFileEntry(path_spec)
else:
file_entry = hive_collector.GetFileEntryByPathSpec(path_spec)
win_registry = winregistry.WinRegistry(
winregistry.WinRegistry.BACKEND_PYREGF)
try:
hive_object = win_registry.OpenFile(
file_entry, codepage=PregCache.knowledge_base_object.codepage)
except IOError:
if filename is not None:
filename_string = filename
elif path_spec:
filename_string = path_spec.location
else:
filename_string = u'unknown file path'
logging.error(
u'Unable to open Registry hive: {0:s} [{1:s}]'.format(
filename_string, hive_collector_name))
return
return PregHiveHelper(
hive_object, file_entry=file_entry, collector_name=hive_collector_name)
def Scan(self, registry_types):
"""Scan for available hives using keyword.
Args:
registry_types: A list of keywords to scan for, eg: "NTUSER",
"SOFTWARE", etc.
"""
if not registry_types:
print (
u'Unable to scan for an empty keyword. Please specify a keyword, '
u'eg: NTUSER, SOFTWARE, etc')
return
hives, collectors = self.tool_front_end.GetHivesAndCollectors(
self.tool_options, registry_types=registry_types)
if not hives:
print u'No new discovered hives.'
return
if type(hives) in (list, tuple):
for hive in hives:
for name, collector in collectors:
hive_helper = self.OpenHive(
hive, hive_collector=collector, hive_collector_name=name)
if hive_helper:
self.hive_storage.AppendHive(hive_helper)
else:
for name, collector in collectors:
hive_helper = self.OpenHive(
hives, hive_collector=collector, hive_collector_name=name)
if hive_helper:
self.hive_storage.AppendHive(hive_helper)
class PregHiveHelper(object):
"""Class that defines few helper functions for Registry operations."""
_currently_loaded_registry_key = ''
_hive = None
_hive_type = u'UNKNOWN'
collector_name = None
file_entry = None
path_expander = None
reg_cache = None
REG_TYPES = {
u'NTUSER': ('\\Software\\Microsoft\\Windows\\CurrentVersion\\Explorer',),
u'SOFTWARE': ('\\Microsoft\\Windows\\CurrentVersion\\App Paths',),
u'SECURITY': ('\\Policy\\PolAdtEv',),
u'SYSTEM': ('\\Select',),
u'SAM': ('\\SAM\\Domains\\Account\\Users',),
u'USRCLASS': (
'\\Local Settings\\Software\\Microsoft\\Windows\\CurrentVersion',),
u'UNKNOWN': (),
}
@property
def name(self):
"""Return the name of the hive."""
return getattr(self._hive, 'name', u'N/A')
@property
def path(self):
"""Return the file path of the hive."""
path_spec = getattr(self.file_entry, 'path_spec', None)
if not path_spec:
return u'N/A'
return getattr(path_spec, 'location', u'N/A')
@property
def root_key(self):
"""Return the root key of the Registry hive."""
return self._hive.GetKeyByPath(u'\\')
@property
def type(self):
"""Return the hive type."""
return self._hive_type
def __init__(self, hive, file_entry, collector_name):
"""Initialize the Registry hive helper.
Args:
hive: A hive object (instance of WinPyregfFile).
file_entry: A file entry object (instance of dfvfs.FileEntry).
collector_name: Name of the collector used as a string.
"""
self._hive = hive
self.file_entry = file_entry
self.collector_name = collector_name
# Determine type and build cache.
self._SetHiveType()
self._BuildHiveCache()
# Initialize the hive to the root key.
_ = self.GetKeyByPath(u'\\')
def _BuildHiveCache(self):
"""Calculate the Registry cache."""
self.reg_cache = cache.WinRegistryCache()
self.reg_cache.BuildCache(self._hive, self._hive_type)
self.path_expander = winreg_path_expander.WinRegistryKeyPathExpander(
reg_cache=self.reg_cache)
def _SetHiveType(self):
"""Detect and set the hive type."""
get_key_by_path = self._hive.GetKeyByPath
for reg_type in self.REG_TYPES:
if reg_type == u'UNKNOWN':
continue
# For a hive to be considered a specific type all of the keys need to
# be found.
found = True
for reg_key in self.REG_TYPES[reg_type]:
if not get_key_by_path(reg_key):
found = False
break
if found:
self._hive_type = reg_type
return
def GetCurrentRegistryKey(self):
"""Return the currently loaded Registry key."""
return self._currently_loaded_registry_key
def GetCurrentRegistryPath(self):
"""Return the loaded Registry key path or None if no key is loaded."""
key = self._currently_loaded_registry_key
if not key:
return
return key.path
def GetKeyByPath(self, path):
"""Retrieves a specific key defined by the Registry path.
Args:
path: the Registry path.
Returns:
The key (instance of WinRegKey) if available or None otherwise.
"""
if not path:
return
key = self._hive.GetKeyByPath(path)
if not key:
return
self._currently_loaded_registry_key = key
return key
class PregStorage(object):
"""Class for storing discovered hives."""
# Index number of the currently loaded Registry hive.
_current_index = -1
_currently_loaded_hive = None
_hive_list = []
@property
def loaded_hive(self):
"""Return the currently loaded hive or None if no hive loaded."""
if not self._currently_loaded_hive:
return
return self._currently_loaded_hive
def __len__(self):
"""Return the number of available hives."""
return len(self._hive_list)
def AppendHive(self, hive_helper):
"""Append a hive object to the Registry hive storage.
Args:
hive_helper: A hive object (instance of PregHiveHelper)
"""
self._hive_list.append(hive_helper)
def AppendHives(self, hive_helpers):
"""Append hives to the Registry hive storage.
Args:
hive_helpers: A list of hive objects (instance of PregHiveHelper)
"""
if type(hive_helpers) not in (list, tuple):
hive_helpers = [hive_helpers]
self._hive_list.extend(hive_helpers)
def ListHives(self):
"""Return a string with a list of all available hives and collectors.
Returns:
A string with a list of all available hives and collectors. If there are
no loaded hives None will be returned.
"""
if not self._hive_list:
return
return_strings = [u'Index Hive [collector]']
for index, hive in enumerate(self._hive_list):
collector = hive.collector_name
if not collector:
collector = u'Currently Allocated'
if self._current_index == index:
star = u'*'
else:
star = u''
return_strings.append(u'{0:<5d} {1:s}{2:s} [{3:s}]'.format(
index, star, hive.path, collector))
return u'\n'.join(return_strings)
def SetOpenHive(self, hive_index):
"""Set the current open hive.
Args:
hive_index: An index (integer) into the hive list.
"""
if not self._hive_list:
return
index = hive_index
if isinstance(hive_index, basestring):
try:
index = int(hive_index, 10)
except ValueError:
print u'Wrong hive index, value should be decimal.'
return
try:
hive_helper = self._hive_list[index]
except IndexError:
print u'Hive not found, index out of range?'
return
self._current_index = index
self._currently_loaded_hive = hive_helper
def CdCompleter(unused_self, unused_event):
"""Completer function for the cd command, returning back sub keys."""
return_list = []
current_hive = PregCache.hive_storage.loaded_hive
current_key = current_hive.GetCurrentRegistryKey()
for key in current_key.GetSubkeys():
return_list.append(key.name)
return return_list
def PluginCompleter(unused_self, event_object):
"""Completer function that returns a list of available plugins."""
ret_list = []
if not IsLoaded():
return ret_list
if not '-h' in event_object.line:
ret_list.append('-h')
plugins_list = parsers_manager.ParsersManager.GetWindowsRegistryPlugins()
current_hive = PregCache.hive_storage.loaded_hive
hive_type = current_hive.type
for plugin_cls in plugins_list.GetKeyPlugins(hive_type):
plugins_list = plugin_cls(reg_cache=current_hive.reg_cache)
plugin_name = plugins_list.plugin_name
if plugin_name.startswith('winreg'):
plugin_name = plugin_name[PregFrontend.PLUGIN_UNIQUE_NAME_START:]
if plugin_name == 'default':
continue
ret_list.append(plugin_name)
return ret_list
def VerboseCompleter(unused_self, event_object):
"""Completer function that suggests simple verbose settings."""
if '-v' in event_object.line:
return []
else:
return ['-v']
@magic.magics_class
class MyMagics(magic.Magics):
"""A simple class holding all magic functions for console."""
EXPANSION_KEY_OPEN = r'{'
EXPANSION_KEY_CLOSE = r'}'
# Match against one instance, not two of the expansion key.
EXPANSION_RE = re.compile(r'{0:s}{{1}}[^{1:s}]+?{1:s}'.format(
EXPANSION_KEY_OPEN, EXPANSION_KEY_CLOSE))
output_writer = sys.stdout
@magic.line_magic('cd')
def ChangeDirectory(self, key):
"""Change between Registry keys, like a directory tree.
The key path can either be an absolute path or a relative one.
Absolute paths can use '.' and '..' to denote current and parent
directory/key path.
Args:
key: The path to the key to traverse to.
"""
registry_key = None
key_path = key
if not key:
self.ChangeDirectory('\\')
loaded_hive = PregCache.hive_storage.loaded_hive
if not loaded_hive:
return
# Check if we need to expand environment attributes.
match = self.EXPANSION_RE.search(key)
if match and u'{0:s}{0:s}'.format(
self.EXPANSION_KEY_OPEN) not in match.group(0):
try:
key = loaded_hive.path_expander.ExpandPath(
key, pre_obj=PregCache.knowledge_base_object.pre_obj)
except (KeyError, IndexError):
pass
if key.startswith(u'\\'):
registry_key = loaded_hive.GetKeyByPath(key)
elif key == '.':
return
elif key.startswith(u'.\\'):
current_path = loaded_hive.GetCurrentRegistryPath()
_, _, key_path = key.partition(u'\\')
registry_key = loaded_hive.GetKeyByPath(u'{0:s}\\{1:s}'.format(
current_path, key_path))
elif key.startswith(u'..'):
parent_path, _, _ = loaded_hive.GetCurrentRegistryPath().rpartition(u'\\')
# We know the path starts with a "..".
if len(key) == 2:
key_path = u''
else:
key_path = key[3:]
if parent_path:
if key_path:
path = u'{0:s}\\{1:s}'.format(parent_path, key_path)
else:
path = parent_path
registry_key = loaded_hive.GetKeyByPath(path)
else:
registry_key = loaded_hive.GetKeyByPath(u'\\{0:s}'.format(key_path))
else:
# Check if key is not set at all, then assume traversal from root.
if not loaded_hive.GetCurrentRegistryPath():
_ = loaded_hive.GetKeyByPath(u'\\')
current_key = loaded_hive.GetCurrentRegistryKey()
if current_key.name == loaded_hive.root_key.name:
key_path = u'\\{0:s}'.format(key)
else:
key_path = u'{0:s}\\{1:s}'.format(current_key.path, key)
registry_key = loaded_hive.GetKeyByPath(key_path)
if registry_key:
if key_path == '\\':
path = '\\'
else:
path = registry_key.path
ConsoleConfig.SetPrompt(
hive_path=loaded_hive.path,
prepend_string=StripCurlyBrace(path).replace('\\', '\\\\'))
else:
print u'Unable to change to: {0:s}'.format(key_path)
@magic.line_magic('hive')
def HiveActions(self, line):
"""Define the hive command on the console prompt."""
if line.startswith('list'):
print PregCache.hive_storage.ListHives()
print u''
print u'To open a hive, use: hive_open INDEX'
elif line.startswith('open ') or line.startswith('load '):
PregCache.hive_storage.SetOpenHive(line[5:])
hive_helper = PregCache.hive_storage.loaded_hive
print u'Opening hive: {0:s} [{1:s}]'.format(
hive_helper.path, hive_helper.collector_name)
ConsoleConfig.SetPrompt(hive_path=hive_helper.path)
elif line.startswith('scan'):
items = line.split()
if len(items) < 2:
print (
u'Unable to scan for an empty keyword. Please specify a keyword, '
u'eg: NTUSER, SOFTWARE, etc')
return
PregCache.hive_storage.Scan(items[1:])
@magic.line_magic('ls')
def ListDirectoryContent(self, line):
"""List all subkeys and values of the current key."""
if not IsLoaded():
return
if 'true' in line.lower():
verbose = True
elif '-v' in line.lower():
verbose = True
else:
verbose = False
sub = []
current_hive = PregCache.hive_storage.loaded_hive
if not current_hive:
return
current_key = current_hive.GetCurrentRegistryKey()
for key in current_key.GetSubkeys():
# TODO: move this construction into a separate function in OutputWriter.
timestamp, _, _ = frontend_utils.OutputWriter.GetDateTimeString(
key.last_written_timestamp).partition('.')
sub.append((u'{0:>19s} {1:>15s} {2:s}'.format(
timestamp.replace('T', ' '), '[KEY]',
key.name), True))
for value in current_key.GetValues():
if not verbose:
sub.append((u'{0:>19s} {1:>14s}] {2:s}'.format(
u'', '[' + value.data_type_string, value.name), False))
else:
if value.DataIsString():
value_string = u'{0:s}'.format(value.data)
elif value.DataIsInteger():
value_string = u'{0:d}'.format(value.data)
elif value.DataIsMultiString():
value_string = u'{0:s}'.format(u''.join(value.data))
elif value.DataIsBinaryData():
hex_string = binascii.hexlify(value.data)
# We'll just print the first few bytes, but we need to pad them
# to make it fit in a single line if shorter.
if len(hex_string) % 32:
breakpoint = len(hex_string) / 32
leftovers = hex_string[breakpoint:]
pad = ' ' * (32 - len(leftovers))
hex_string += pad
value_string = frontend_utils.OutputWriter.GetHexDumpLine(
hex_string, 0)
else:
value_string = u''
sub.append((
u'{0:>19s} {1:>14s}] {2:<25s} {3:s}'.format(
u'', '[' + value.data_type_string, value.name, value_string),
False))
for entry, subkey in sorted(sub):
if subkey:
self.output_writer.write(u'dr-xr-xr-x {0:s}\n'.format(entry))
else:
self.output_writer.write(u'-r-xr-xr-x {0:s}\n'.format(entry))
@magic.line_magic('parse')
def ParseCurrentKey(self, line):
"""Parse the current key."""
if 'true' in line.lower():
verbose = True
elif '-v' in line.lower():
verbose = True
else:
verbose = False
if not IsLoaded():
return
current_hive = PregCache.hive_storage.loaded_hive
if not current_hive:
return
# Clear the last results from parse key.
PregCache.events_from_last_parse = []
print_strings = ParseKey(
key=current_hive.GetCurrentRegistryKey(), hive_helper=current_hive,
shell_helper=PregCache.shell_helper, verbose=verbose)
self.output_writer.write(u'\n'.join(print_strings))
# Print out a hex dump of all binary values.
if verbose:
header_shown = False
for value in current_hive.GetCurrentRegistryKey().GetValues():
if value.DataIsBinaryData():
if not header_shown:
header_shown = True
print frontend_utils.FormatHeader('Hex Dump')
# Print '-' 80 times.
self.output_writer.write(u'-'*80)
self.output_writer.write(u'\n')
self.output_writer.write(
frontend_utils.FormatOutputString('Attribute', value.name))
self.output_writer.write(u'-'*80)
self.output_writer.write(u'\n')
self.output_writer.write(
frontend_utils.OutputWriter.GetHexDump(value.data))
self.output_writer.write(u'\n')
self.output_writer.write(u'+-'*40)
self.output_writer.write(u'\n')
self.output_writer.flush()
@magic.line_magic('plugin')
def ParseWithPlugin(self, line):
"""Parse a Registry key using a specific plugin."""
if not IsLoaded():
print u'No hive loaded, unable to parse.'
return
current_hive = PregCache.hive_storage.loaded_hive
if not current_hive:
return
if not line:
print u'No plugin name added.'
return
plugin_name = line
if '-h' in line:
items = line.split()
if len(items) != 2:
print u'Wrong usage: plugin [-h] PluginName'
return
if items[0] == '-h':
plugin_name = items[1]
else:
plugin_name = items[0]
if not plugin_name.startswith('winreg'):
plugin_name = u'winreg_{0:s}'.format(plugin_name)
hive_type = current_hive.type
plugins_list = parsers_manager.ParsersManager.GetWindowsRegistryPlugins()
plugin_found = False
for plugin_cls in plugins_list.GetKeyPlugins(hive_type):
plugin = plugin_cls(reg_cache=current_hive.reg_cache)
if plugin.plugin_name == plugin_name:
# If we found the correct plugin.
plugin_found = True
break
if not plugin_found:
print u'No plugin named: {0:s} available for Registry type {1:s}'.format(
plugin_name, hive_type)
return
if not hasattr(plugin, 'REG_KEYS'):
print u'Plugin: {0:s} has no key information.'.format(line)
return
if '-h' in line:
print frontend_utils.FormatHeader(plugin_name)
print frontend_utils.FormatOutputString('Description', plugin.__doc__)
print u''
for registry_key in plugin.expanded_keys:
print frontend_utils.FormatOutputString('Registry Key', registry_key)
return
if not plugin.expanded_keys:
plugin.ExpandKeys(PregCache.parser_context)
# Clear the last results from parse key.
PregCache.events_from_last_parse = []
# Defining outside of for loop for optimization.
get_key_by_path = current_hive.GetKeyByPath
for registry_key in plugin.expanded_keys:
key = get_key_by_path(registry_key)
if not key:
print u'Key: {0:s} not found'.format(registry_key)
continue
# Move the current location to the key to be parsed.
self.ChangeDirectory(registry_key)
# Parse the key.
print_strings = ParseKey(
key=current_hive.GetCurrentRegistryKey(), hive_helper=current_hive,
shell_helper=PregCache.shell_helper, verbose=False,
use_plugins=[plugin_name])
self.output_writer.write(u'\n'.join(print_strings))
self.output_writer.flush()
@magic.line_magic('pwd')
def PrintCurrentWorkingDirectory(self, unused_line):
"""Print the current path."""
if not IsLoaded():
return
current_hive = PregCache.hive_storage.loaded_hive
if not current_hive:
return
self.output_writer.write(u'{0:s}\n'.format(
current_hive.GetCurrentRegistryPath()))
@magic.line_magic('redirect_output')
def RedirectOutput(self, output_object):
"""Change the output writer to redirect plugin output to a file."""
if isinstance(output_object, basestring):
output_object = open(output_object, 'wb')
if hasattr(output_object, 'write'):
self.output_writer = output_object
def StripCurlyBrace(string):
"""Return a format "safe" string."""
return string.replace('}', '}}').replace('{', '{{')
def IsLoaded():
"""Checks if a Windows Registry Hive is loaded."""
current_hive = PregCache.hive_storage.loaded_hive
if not current_hive:
return False
current_key = current_hive.GetCurrentRegistryKey()
if hasattr(current_key, 'path'):
return True
if current_hive.name != 'N/A':
return True
print (
u'No hive loaded, cannot complete action. Use "hive list" '
u'and "hive open" to load a hive.')
return False
def GetValue(value_name):
"""Return a value object from the currently loaded Registry key.
Args:
value_name: A string containing the name of the value to be retrieved.
Returns:
The Registry value (instance of WinPyregfValue) if it exists, None if
either there is no currently loaded Registry key or if the value does
not exist.
"""
current_hive = PregCache.hive_storage.loaded_hive
current_key = current_hive.GetCurrentRegistryKey()
if not current_key:
return
return current_key.GetValue(value_name)
def GetValueData(value_name):
"""Return the value data from a value in the currently loaded Registry key.
Args:
value_name: A string containing the name of the value to be retrieved.
Returns:
The data from a Registry value if it exists, None if either there is no
currently loaded Registry key or if the value does not exist.
"""
value = GetValue(value_name)
if not value:
return
return value.data
def GetCurrentKey():
"""Return the currently loaded Registry key (instance of WinPyregfKey).
Returns:
The currently loaded Registry key (instance of WinPyregfKey) or None
if there is no loaded key.
"""
current_hive = PregCache.hive_storage.loaded_hive
return current_hive.GetCurrentRegistryKey()
def GetFormatString(event_object):
"""Return back a format string that can be used for a given event object."""
# Assign a default value to font align length.
align_length = 15
# Go through the attributes and see if there is an attribute
# value that is longer than the default font align length, and adjust
# it accordingly if found.
if hasattr(event_object, 'regvalue'):
attributes = event_object.regvalue.keys()
else:
attributes = event_object.GetAttributes().difference(
event_object.COMPARE_EXCLUDE)
for attribute in attributes:
attribute_len = len(attribute)
if attribute_len > align_length and attribute_len < 30:
align_length = len(attribute)
# Create the format string that will be used, using variable length
# font align length (calculated in the prior step).
return u'{{0:>{0:d}s}} : {{1!s}}'.format(align_length)
def GetEventHeader(event_object, descriptions, exclude_timestamp):
"""Returns a list of strings that contains a header for the event.
Args:
event_object: An event object (instance of event.EventObject).
descriptions: A list of strings describing the value of the header
timestamp.
exclude_timestamp: A boolean. If it is set to True the method
will not include the timestamp in the header.
Returns:
A list of strings containing header information for the event.
"""
format_string = GetFormatString(event_object)
# Create the strings to return.
ret_strings = []
ret_strings.append(u'Key information.')
if not exclude_timestamp:
for description in descriptions:
ret_strings.append(format_string.format(
description, timelib.Timestamp.CopyToIsoFormat(
event_object.timestamp)))
if hasattr(event_object, 'keyname'):
ret_strings.append(format_string.format(u'Key Path', event_object.keyname))
if event_object.timestamp_desc != eventdata.EventTimestamp.WRITTEN_TIME:
ret_strings.append(format_string.format(
u'Description', event_object.timestamp_desc))
ret_strings.append(frontend_utils.FormatHeader(u'Data', u'-'))
return ret_strings
def GetEventBody(event_object, file_entry=None, show_hex=False):
"""Returns a list of strings containing information from an event.
Args:
event_object: An event object (instance of event.EventObject).
file_entry: An optional file entry object (instance of dfvfs.FileEntry) that
the event originated from. Default is None.
show_hex: A boolean, if set to True hex dump of the value is included in
the output. The default value is False.
Returns:
A list of strings containing the event body.
"""
format_string = GetFormatString(event_object)
ret_strings = []
timestamp_description = getattr(
event_object, 'timestamp_desc', eventdata.EventTimestamp.WRITTEN_TIME)
if timestamp_description != eventdata.EventTimestamp.WRITTEN_TIME:
ret_strings.append(u'<{0:s}>'.format(timestamp_description))
if hasattr(event_object, 'regvalue'):
attributes = event_object.regvalue
else:
# TODO: Add a function for this to avoid repeating code.
keys = event_object.GetAttributes().difference(
event_object.COMPARE_EXCLUDE)
keys.discard('offset')
keys.discard('timestamp_desc')
attributes = {}
for key in keys:
attributes[key] = getattr(event_object, key)
for attribute, value in attributes.items():
ret_strings.append(format_string.format(attribute, value))
if show_hex and file_entry:
event_object.pathspec = file_entry.path_spec
ret_strings.append(frontend_utils.FormatHeader(
u'Hex Output From Event.', '-'))
ret_strings.append(
frontend_utils.OutputWriter.GetEventDataHexDump(event_object))
return ret_strings
def GetRangeForAllLoadedHives():
"""Return a range or a list of all loaded hives."""
return range(0, GetTotalNumberOfLoadedHives())
def GetTotalNumberOfLoadedHives():
"""Return the total number of Registy hives that are loaded."""
return len(PregCache.hive_storage)
def ParseKey(key, shell_helper, hive_helper, verbose=False, use_plugins=None):
"""Parse a single Registry key and return parsed information.
Parses the Registry key either using the supplied plugin or trying against
all avilable plugins.
Args:
key: The Registry key to parse, WinRegKey object or a string.
shell_helper: A shell helper object (instance of PregHelper).
hive_helper: A hive object (instance of PregHiveHelper).
verbose: Print additional information, such as a hex dump.
use_plugins: A list of plugin names to use, or none if all should be used.
Returns:
A list of strings.
"""
print_strings = []
if not hive_helper:
return
if isinstance(key, basestring):
key = hive_helper.GetKeyByPath(key)
if not key:
return
# Detect Registry type.
registry_type = hive_helper.type
plugins = {}
plugins_list = parsers_manager.ParsersManager.GetWindowsRegistryPlugins()
# Compile a list of plugins we are about to use.
for weight in plugins_list.GetWeights():
plugin_list = plugins_list.GetWeightPlugins(weight, registry_type)
plugins[weight] = []
for plugin in plugin_list:
if use_plugins:
plugin_obj = plugin(reg_cache=hive_helper.reg_cache)
if plugin_obj.NAME in use_plugins:
plugins[weight].append(plugin_obj)
else:
plugins[weight].append(plugin(
reg_cache=hive_helper.reg_cache))
event_queue = single_process.SingleProcessQueue()
event_queue_consumer = PregEventObjectQueueConsumer(event_queue)
# Build a parser context.
parser_context = shell_helper.BuildParserContext(event_queue)
# Run all the plugins in the correct order of weight.
for weight in plugins:
for plugin in plugins[weight]:
plugin.Process(parser_context, key=key)
event_queue_consumer.ConsumeEventObjects()
if not event_queue_consumer.event_objects:
continue
print_strings.append(u'')
print_strings.append(
u'{0:^80}'.format(u' ** Plugin : {0:s} **'.format(
plugin.plugin_name)))
print_strings.append(u'')
print_strings.append(u'[{0:s}] {1:s}'.format(
plugin.REG_TYPE, plugin.DESCRIPTION))
print_strings.append(u'')
if plugin.URLS:
print_strings.append(u'Additional information can be found here:')
for url in plugin.URLS:
print_strings.append(u'{0:>17s} {1:s}'.format(u'URL :', url))
print_strings.append(u'')
# TODO: move into the event queue consumer.
event_objects_and_timestamps = {}
event_object = event_queue_consumer.event_objects.pop(0)
while event_object:
PregCache.events_from_last_parse.append(event_object)
event_objects_and_timestamps.setdefault(
event_object.timestamp, []).append(event_object)
if event_queue_consumer.event_objects:
event_object = event_queue_consumer.event_objects.pop(0)
else:
event_object = None
if not event_objects_and_timestamps:
continue
# If there is only a single timestamp then we'll include it in the
# header, otherwise each event will have it's own timestamp.
if len(event_objects_and_timestamps) > 1:
exclude_timestamp_in_header = True
else:
exclude_timestamp_in_header = False
first = True
for event_timestamp in sorted(event_objects_and_timestamps):
if first:
first_event = event_objects_and_timestamps[event_timestamp][0]
descriptions = set()
for event_object in event_objects_and_timestamps[event_timestamp]:
descriptions.add(getattr(event_object, 'timestamp_desc', u''))
print_strings.extend(GetEventHeader(
first_event, list(descriptions), exclude_timestamp_in_header))
first = False
if exclude_timestamp_in_header:
print_strings.append(u'')
print_strings.append(u'[{0:s}]'.format(
timelib.Timestamp.CopyToIsoFormat(event_timestamp)))
for event_object in event_objects_and_timestamps[event_timestamp]:
print_strings.append(u'')
print_strings.extend(GetEventBody(
event_object, hive_helper.file_entry, verbose))
print_strings.append(u'')
# Printing '*' 80 times.
print_strings.append(u'*'*80)
print_strings.append(u'')
return print_strings
# TODO: Move this to dfVFS and improve.
def PathExists(file_path):
"""Determine whether given file path exists as a file, directory or a device.
Args:
file_path: A string denoting the file path that needs checking.
Returns:
A tuple, a boolean indicating whether or not the path exists and a string
that contains the reason, if any, why this was not determined to be a file.
"""
if os.path.exists(file_path):
return True, u''
try:
if pysmdev.check_device(file_path):
return True, u''
except IOError as exception:
return False, u'Unable to determine, with error: {0:s}'.format(exception)
return False, u'Not an existing file.'
def RunModeConsole(front_end, options):
"""Open up an iPython console.
Args:
options: the command line arguments (instance of argparse.Namespace).
"""
namespace = {}
function_name_length = 23
banners = []
banners.append(frontend_utils.FormatHeader(
u'Welcome to PREG - home of the Plaso Windows Registry Parsing.'))
banners.append(u'')
banners.append(u'Some of the commands that are available for use are:')
banners.append(u'')
banners.append(frontend_utils.FormatOutputString(
u'cd key', u'Navigate the Registry like a directory structure.',
function_name_length))
banners.append(frontend_utils.FormatOutputString(
u'ls [-v]', (
u'List all subkeys and values of a Registry key. If called as '
u'ls True then values of keys will be included in the output.'),
function_name_length))
banners.append(frontend_utils.FormatOutputString(
u'parse -[v]', u'Parse the current key using all plugins.',
function_name_length))
banners.append(frontend_utils.FormatOutputString(
u'pwd', u'Print the working "directory" or the path of the current key.',
function_name_length))
banners.append(frontend_utils.FormatOutputString(
u'plugin [-h] plugin_name', (
u'Run a particular key-based plugin on the loaded hive. The correct '
u'Registry key will be loaded, opened and then parsed.'),
function_name_length))
banners.append(frontend_utils.FormatOutputString(
u'get_value value_name', (
u'Get a value from the currently loaded Registry key.')))
banners.append(frontend_utils.FormatOutputString(
u'get_value_data value_name', (
u'Get a value data from a value stored in the currently loaded '
u'Registry key.')))
banners.append(frontend_utils.FormatOutputString(
u'get_key', u'Return the currently loaded Registry key.'))
banners.append(u'')
# Build the global cache and prepare the tool.
hive_storage = PregStorage()
shell_helper = PregHelper(options, front_end, hive_storage)
parser_context = shell_helper.BuildParserContext()
PregCache.parser_context = parser_context
PregCache.shell_helper = shell_helper
PregCache.hive_storage = hive_storage
registry_types = getattr(options, 'regfile', None)
if isinstance(registry_types, basestring):
registry_types = registry_types.split(u',')
if not registry_types:
registry_types = [
'NTUSER', 'USRCLASS', 'SOFTWARE', 'SYSTEM', 'SAM', 'SECURITY']
PregCache.shell_helper.Scan(registry_types)
if len(PregCache.hive_storage) == 1:
PregCache.hive_storage.SetOpenHive(0)
hive_helper = PregCache.hive_storage.loaded_hive
banners.append(
u'Opening hive: {0:s} [{1:s}]'.format(
hive_helper.path, hive_helper.collector_name))
ConsoleConfig.SetPrompt(hive_path=hive_helper.path)
loaded_hive = PregCache.hive_storage.loaded_hive
if loaded_hive and loaded_hive.name != u'N/A':
banners.append(
u'Registry hive: {0:s} is available and loaded.'.format(
loaded_hive.name))
else:
banners.append(u'More than one Registry file ready for use.')
banners.append(u'')
banners.append(PregCache.hive_storage.ListHives())
banners.append(u'')
banners.append((
u'Use "hive open INDEX" to load a hive and "hive list" to see a '
u'list of available hives.'))
banners.append(u'')
banners.append(u'Happy command line console fu-ing.')
# Adding variables in scope.
namespace.update(globals())
namespace.update({
'get_current_key': GetCurrentKey,
'get_key': GetCurrentKey,
'get_value': GetValue,
'get_value_data': GetValueData,
'number_of_hives': GetTotalNumberOfLoadedHives,
'range_of_hives': GetRangeForAllLoadedHives,
'options': options})
ipshell_config = ConsoleConfig.GetConfig()
if loaded_hive:
ConsoleConfig.SetPrompt(
hive_path=loaded_hive.name, config=ipshell_config)
else:
ConsoleConfig.SetPrompt(hive_path=u'NO HIVE LOADED', config=ipshell_config)
# Starting the shell.
ipshell = InteractiveShellEmbed(
user_ns=namespace, config=ipshell_config, banner1=u'\n'.join(banners),
exit_msg='')
ipshell.confirm_exit = False
# Adding "magic" functions.
ipshell.register_magics(MyMagics)
# Set autocall to two, making parenthesis not necessary when calling
# function names (although they can be used and are necessary sometimes,
# like in variable assignements, etc).
ipshell.autocall = 2
# Registering command completion for the magic commands.
ipshell.set_hook('complete_command', CdCompleter, str_key='%cd')
ipshell.set_hook('complete_command', VerboseCompleter, str_key='%ls')
ipshell.set_hook('complete_command', VerboseCompleter, str_key='%parse')
ipshell.set_hook('complete_command', PluginCompleter, str_key='%plugin')
ipshell()
def Main():
"""Run the tool."""
output_writer = frontend.StdoutFrontendOutputWriter()
front_end = PregFrontend(output_writer)
epilog = textwrap.dedent("""
Example usage:
Parse the SOFTWARE hive from an image:
{0:s} [--vss] [--vss-stores VSS_STORES] -i IMAGE_PATH [-o OFFSET] -c SOFTWARE
Parse an userassist key within an extracted hive:
{0:s} -p userassist MYNTUSER.DAT
Parse the run key from all Registry keys (in vss too):
{0:s} --vss -i IMAGE_PATH [-o OFFSET] -p run
Open up a console session for the SYSTEM hive inside an image:
{0:s} -i IMAGE_PATH [-o OFFSET] -c SYSTEM
""").format(os.path.basename(sys.argv[0]))
description = textwrap.dedent("""
preg is a simple Windows Registry parser using the plaso Registry
plugins and image parsing capabilities.
It uses the back-end libraries of plaso to read raw image files and
extract Registry files from VSS and restore points and then runs the
Registry plugins of plaso against the Registry hive and presents it
in a textual format.
""")
arg_parser = argparse.ArgumentParser(
epilog=epilog, description=description, add_help=False,
formatter_class=argparse.RawDescriptionHelpFormatter)
# Create the different argument groups.
mode_options = arg_parser.add_argument_group(u'Run Mode Options')
image_options = arg_parser.add_argument_group(u'Image Options')
info_options = arg_parser.add_argument_group(u'Informational Options')
additional_data = arg_parser.add_argument_group(u'Additional Options')
mode_options.add_argument(
'-c', '--console', dest='console', action='store_true', default=False,
help=u'Drop into a console session Instead of printing output to STDOUT.')
additional_data.add_argument(
'-r', '--restore_points', dest='restore_points', action='store_true',
default=False, help=u'Include restore points for hive locations.')
image_options.add_argument(
'-i', '--image', dest='image', action='store', type=unicode, default='',
metavar='IMAGE_PATH',
help=(u'If the Registry file is contained within a storage media image, '
u'set this option to specify the path of image file.'))
front_end.AddImageOptions(image_options)
info_options.add_argument(
'-v', '--verbose', dest='verbose', action='store_true', default=False,
help=u'Print sub key information.')
info_options.add_argument(
'-h', '--help', action='help', help=u'Show this help message and exit.')
front_end.AddVssProcessingOptions(additional_data)
info_options.add_argument(
'--info', dest='info', action='store_true', default=False,
help=u'Print out information about supported plugins.')
mode_options.add_argument(
'-p', '--plugins', dest='plugin_names', action='append', default=[],
type=unicode, metavar='PLUGIN_NAME',
help=(
u'Substring match of the Registry plugin to be used, this '
u'parameter can be repeated to create a list of plugins to be '
u'run against, eg: "-p userassist -p rdp" or "-p userassist".'))
mode_options.add_argument(
'-k', '--key', dest='key', action='store', default='', type=unicode,
metavar='REGISTRY_KEYPATH',
help=(u'A Registry key path that the tool should parse using all '
u'available plugins.'))
arg_parser.add_argument(
'regfile', action='store', metavar='REGHIVE', nargs='?',
help=(u'The Registry hive to read key from (not needed if running '
u'using a plugin)'))
# Parse the command line arguments.
options = arg_parser.parse_args()
if options.info:
print front_end.GetListOfAllPlugins()
return True
try:
front_end.ParseOptions(options, source_option='image')
except errors.BadConfigOption as exception:
arg_parser.print_usage()
print u''
logging.error('{0:s}'.format(exception))
return False
# Run the tool, using the run mode according to the options passed
# to the tool.
if front_end.run_mode == front_end.RUN_MODE_CONSOLE:
RunModeConsole(front_end, options)
if front_end.run_mode == front_end.RUN_MODE_REG_KEY:
front_end.RunModeRegistryKey(options, options.plugin_names)
elif front_end.run_mode == front_end.RUN_MODE_REG_PLUGIN:
front_end.RunModeRegistryPlugin(options, options.plugin_names)
elif front_end.run_mode == front_end.RUN_MODE_REG_FILE:
front_end.RunModeRegistryFile(options, options.regfile)
return True
if __name__ == '__main__':
if not Main():
sys.exit(1)
else:
sys.exit(0)