plaso-rubanetra/plaso/frontend/pshell.py
2020-04-06 18:48:34 +02:00

499 lines
16 KiB
Python
Executable File

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright 2013 The Plaso Project Authors.
# Please see the AUTHORS file for details on individual authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This file contains a console, the CLI friendly front-end to plaso."""
import argparse
import logging
import os
import random
import sys
import tempfile
from dfvfs.lib import definitions
from dfvfs.path import factory as path_spec_factory
from dfvfs.resolver import resolver as path_spec_resolver
try:
# Support version 1.X of IPython.
# pylint: disable=no-name-in-module
from IPython.terminal.embed import InteractiveShellEmbed
except ImportError:
# Support version older than 1.X of IPython.
# pylint: disable=no-name-in-module
from IPython.frontend.terminal.embed import InteractiveShellEmbed
from IPython.config.loader import Config
# pylint: disable=unused-import
from plaso import analysis
from plaso import filters
from plaso import formatters
from plaso import output
from plaso import parsers
from plaso import preprocessors
from plaso.classifier import scanner
from plaso.engine import collector
from plaso.engine import engine
from plaso.engine import queue
from plaso.engine import single_process
from plaso.engine import utils as engine_utils
from plaso.frontend import frontend
from plaso.frontend import utils as frontend_utils
from plaso.lib import binary
from plaso.lib import bufferlib
from plaso.lib import errors
from plaso.lib import event
from plaso.lib import eventdata
from plaso.lib import filter_interface
from plaso.lib import lexer
from plaso.lib import objectfilter
from plaso.lib import output as output_lib
from plaso.lib import pfilter
from plaso.lib import proxy
from plaso.lib import putils
from plaso.lib import registry as class_registry
from plaso.lib import storage
from plaso.lib import timelib
from plaso.lib import utils
from plaso.multi_processing import foreman
from plaso.multi_processing import rpc_proxy
from plaso.multi_processing import process_info
from plaso.output import helper as output_helper
from plaso.parsers import manager as parsers_manager
from plaso.parsers import plugins
from plaso.parsers import text_parser
from plaso.proto import plaso_storage_pb2
from plaso.serializer import interface as serializer_interface
from plaso.serializer import json_serializer
from plaso.serializer import protobuf_serializer
from plaso.unix import bsmtoken
from plaso.winnt import environ_expand
from plaso.winnt import known_folder_ids
from plaso.winreg import cache as win_registry_cache
from plaso.winreg import interface as win_registry_interface
from plaso.winreg import path_expander
from plaso.winreg import utils as win_registry_utils
from plaso.winreg import winpyregf
from plaso.winreg import winregistry
class PshellFrontend(frontend.ExtractionFrontend):
"""Class that implements the pshell front-end."""
_BYTES_IN_A_MIB = 1024 * 1024
def __init__(self):
"""Initializes the front-end object."""
input_reader = frontend.StdinFrontendInputReader()
output_writer = frontend.StdoutFrontendOutputWriter()
super(PshellFrontend, self).__init__(input_reader, output_writer)
def FindAllOutputs():
"""FindAllOutputs() - All available outputs."""
return putils.FindAllOutputs()
def GetEventData(event_proto, before=0):
"""Prints a hexdump of the event data."""
return frontend_utils.OutputWriter.GetEventDataHexDump(event_proto, before)
def GetFileEntryFromEventObject(event_object):
"""Return a file entry object from a pathspec object.
Args:
event_object: An event object (an instance of EventObject).
Returns:
A file entry object (instance of vfs.file_entry.FileEntry) or
None if the event object doesn't have a defined path spec.
"""
path_spec = getattr(event_object, 'pathspec', None)
if not path_spec:
return
return path_spec_resolver.Resolver.OpenFileEntry(path_spec)
def GetParserNames(parser_filter_string=None):
"""Retrieves the parser names.
Args:
parser_filter_string: Optional parser filter string. The default is None.
Returns:
A list of parser names.
"""
return parsers_manager.ParsersManager.GetParserNames(
parser_filter_string=parser_filter_string)
def GetParserObjects(parser_filter_string=None):
"""Retrieves the parser objects.
Args:
parser_filter_string: Optional parser filter string. The default is None.
Returns:
A list of parser objects (instances of BaseParser).
"""
return parsers_manager.ParsersManager.GetParserObjects(
parser_filter_string=parser_filter_string)
def OpenOSFile(path):
"""Opens a file entry from the OS."""
if not os.path.isfile(path):
logging.error(u'File: {0:s} does not exist.'.format(path))
return
path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_OS, location=path)
return path_spec_resolver.Resolver.OpenFileEntry(path_spec)
def OpenStorageFile(storage_path):
"""Opens a storage file and returns the storage file object."""
if not os.path.isfile(storage_path):
return
try:
store = storage.StorageFile(storage_path, read_only=True)
except IOError:
print 'Unable to load storage file, not a storage file?'
return store
def OpenTskFile(image_path, image_offset, path=None, inode=None):
"""Opens a file entry of a file inside an image file."""
path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_OS, location=image_path)
if image_offset > 0:
volume_path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_TSK_PARTITION, start_offset=image_offset,
parent=path_spec)
else:
volume_path_spec = path_spec
if inode is not None:
if path is None:
path = u''
path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_TSK, inode=inode, location=path,
parent=volume_path_spec)
else:
path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_TSK, location=path, parent=volume_path_spec)
return path_spec_resolver.Resolver.OpenFileEntry(path_spec)
def OpenVssFile(path, image_path, store_number, image_offset):
"""Opens a file entry inside a VSS inside an image file."""
path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_OS, location=image_path)
if image_offset > 0:
volume_path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_TSK_PARTITION, start_offset=image_offset,
parent=path_spec)
else:
volume_path_spec = path_spec
store_number -= 1
path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_VSHADOW, store_index=store_number,
parent=volume_path_spec)
path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_TSK, location=path, parent=path_spec)
return path_spec_resolver.Resolver.OpenFileEntry(path_spec)
def ParseFile(file_entry):
"""Parse a file given a file entry or path and return a list of results.
Args:
file_entry: Either a file entry object (instance of dfvfs.FileEntry)
or a string containing a path (absolute or relative) to a
local file.
Returns:
A list of event object (instance of EventObject) that were extracted from
the file (or an empty list if no events were extracted).
"""
if not file_entry:
return
if isinstance(file_entry, basestring):
file_entry = OpenOSFile(file_entry)
# Set up the engine.
# TODO: refactor and add queue limit.
collection_queue = single_process.SingleProcessQueue()
storage_queue = single_process.SingleProcessQueue()
parse_error_queue = single_process.SingleProcessQueue()
engine_object = engine.BaseEngine(
collection_queue, storage_queue, parse_error_queue)
# Create a worker.
worker_object = engine_object.CreateExtractionWorker(0)
# TODO: add support for parser_filter_string.
worker_object.InitalizeParserObjects()
worker_object.ParseFileEntry(file_entry)
collection_queue.SignalEndOfInput()
engine_object.SignalEndOfInputStorageQueue()
results = []
while True:
try:
item = storage_queue.PopItem()
except errors.QueueEmpty:
break
if isinstance(item, queue.QueueEndOfInput):
break
results.append(item)
return results
def Pfile2File(file_object, path):
"""Saves a file-like object to the path."""
return frontend_utils.OutputWriter.WriteFile(file_object, path)
def PrintTimestamp(timestamp):
"""Prints a human readable timestamp from a timestamp value."""
return frontend_utils.OutputWriter.GetDateTimeString(timestamp)
def PrintTimestampFromEvent(event_object):
"""Prints a human readable timestamp from values stored in an event object."""
return PrintTimestamp(getattr(event_object, 'timestamp', 0))
def Main():
"""Start the tool."""
temp_location = tempfile.gettempdir()
options = putils.Options()
# Set the default options.
options.buffer_size = 0
options.debug = False
options.filename = '.'
options.file_filter = ''
options.filter = ''
options.image = False
options.image_offset = None
options.image_offset_bytes = None
options.old_preprocess = False
options.open_files = False
options.output = os.path.join(temp_location, 'wheredidmytimelinego.dump')
options.output_module = ''
options.parsers = ''
options.parse_vss = False
options.preprocess = False
options.recursive = False
options.single_process = False
options.timezone = 'UTC'
options.workers = 5
format_str = '[%(levelname)s] (%(processName)-10s) %(message)s'
logging.basicConfig(format=format_str)
front_end = PshellFrontend()
try:
front_end.ParseOptions(options, source_option='filename')
front_end.SetStorageFile(options.output)
except errors.BadConfigOption as exception:
logging.error(u'{0:s}'.format(exception))
# TODO: move to frontend object.
if options.image and options.image_offset_bytes is None:
if options.image_offset is not None:
bytes_per_sector = getattr(options, 'bytes_per_sector', 512)
options.image_offset_bytes = options.image_offset * bytes_per_sector
else:
options.image_offset_bytes = 0
namespace = {}
pre_obj = event.PreprocessObject()
namespace.update(globals())
namespace.update({
'frontend': front_end,
'pre_obj': pre_obj,
'options': options,
'find_all_output': FindAllOutputs,
'parse_file': ParseFile,
'timestamp_from_event': PrintTimestampFromEvent,
'message': formatters.manager.EventFormatterManager.GetMessageStrings})
# Include few random phrases that get thrown in once the user exists the
# shell.
_my_random_phrases = [
u'I haven\'t seen timelines like this since yesterday.',
u'Timelining is super relaxing.',
u'Why did I not use the shell before?',
u'I like a do da cha cha',
u'I AM the Shogun of Harlem!',
(u'It doesn\'t matter if you win or lose, it\'s what you do with your '
u'dancin\' shoes'),
u'I have not had a night like that since the seventies.',
u'Baker Team. They\'re all dead, sir.',
(u'I could have killed \'em all, I could\'ve killed you. In town '
u'you\'re the law, out here it\'s me.'),
(u'Are you telling me that 200 of our men against your boy is a no-win '
u'situation for us?'),
u'Hunting? We ain\'t huntin\' him, he\'s huntin\' us!',
u'You picked the wrong man to push',
u'Live for nothing or die for something',
u'I am the Fred Astaire of karate.',
(u'God gave me a great body and it\'s my duty to take care of my '
u'physical temple.'),
u'This maniac should be wearing a number, not a badge',
u'Imagination is more important than knowledge.',
u'Do you hate being dead?',
u'You\'ve got 5 seconds... and 3 are up.',
u'He is in a gunfight right now. I\'m gonna have to take a message',
u'That would be better than losing your teeth',
u'The less you know, the more you make',
(u'A SQL query goes into a bar, walks up to two tables and asks, '
u'"Can I join you?"'),
u'This is your captor speaking.',
(u'If I find out you\'re lying, I\'ll come back and kill you in your '
u'own kitchen.'),
u'That would be better than losing your teeth',
(u'He\'s the kind of guy who would drink a gallon of gasoline so '
u'that he can p*ss into your campfire.'),
u'I\'m gonna take you to the bank, Senator Trent. To the blood bank!',
u'I missed! I never miss! They must have been smaller than I thought',
u'Nah. I\'m just a cook.',
u'Next thing I know, you\'ll be dating musicians.',
u'Another cold day in hell',
u'Yeah, but I bet you she doesn\'t see these boys in the choir.',
u'You guys think you\'re above the law... well you ain\'t above mine!',
(u'One thought he was invincible... the other thought he could fly... '
u'They were both wrong'),
u'To understand what recursion is, you must first understand recursion']
arg_description = (
u'pshell is the interactive session tool that can be used to'
u'MISSING')
arg_parser = argparse.ArgumentParser(description=arg_description)
arg_parser.add_argument(
'-s', '--storage_file', '--storage-file', dest='storage_file',
type=unicode, default=u'', help=u'Path to a plaso storage file.',
action='store', metavar='PATH')
configuration = arg_parser.parse_args()
if configuration.storage_file:
store = OpenStorageFile(configuration.storage_file)
if store:
namespace.update({'store': store})
functions = [
FindAllOutputs, GetEventData, GetParserNames, GetParserObjects,
OpenOSFile, OpenStorageFile, OpenTskFile, OpenVssFile,
ParseFile, Pfile2File,
PrintTimestamp, PrintTimestampFromEvent]
functions_strings = []
for function in functions:
docstring, _, _ = function.__doc__.partition(u'\n')
docstring = u'\t{0:s} - {1:s}'.format(function.__name__, docstring)
functions_strings.append(docstring)
functions_strings = u'\n'.join(functions_strings)
banner = (
u'--------------------------------------------------------------\n'
u' Welcome to Plaso console - home of the Plaso adventure land.\n'
u'--------------------------------------------------------------\n'
u'This is the place where everything is allowed, as long as it is '
u'written in Python.\n\n'
u'Objects available:\n\toptions - set of options to the frontend.\n'
u'\tfrontend - A copy of the pshell frontend.\n'
u'\n'
u'All libraries have been imported and can be used, see help(frontend) '
u'or help(parser).\n'
u'\n'
u'Base methods:\n'
u'{0:s}'
u'\n\tmessage - Print message strings from an event object.'
u'\n'
u'\n'
u'p.s. typing in "pdb" and pressing enter puts the shell in debug'
u'mode which causes all exceptions being sent to pdb.\n'
u'Happy command line console fu-ing.\n\n').format(functions_strings)
exit_message = u'You are now leaving the winter wonderland.\n\n{}'.format(
random.choice(_my_random_phrases))
shell_config = Config()
# Make slight adjustments to the iPython prompt.
shell_config.PromptManager.out_template = (
r'{color.Normal}[{color.Red}\#{color.Normal}]<<< ')
shell_config.PromptManager.in_template = (
r'[{color.LightBlue}\T{color.Normal}] {color.LightPurple}\Y2\n'
r'{color.Normal}[{color.Red}\#{color.Normal}] \$ ')
shell_config.PromptManager.in2_template = r'.\D.>>>'
ipshell = InteractiveShellEmbed(
user_ns=namespace, config=shell_config, banner1=banner,
exit_msg=exit_message)
ipshell.confirm_exit = False
# Set autocall to two, making parenthesis not necessary when calling
# function names (although they can be used and are necessary sometimes,
# like in variable assignments, etc).
ipshell.autocall = 2
ipshell()
return True
if __name__ == '__main__':
if not Main():
sys.exit(1)
else:
sys.exit(0)