plaso-rubanetra/tools/plaso_extract_search_history.py

243 lines
7.3 KiB
Python
Raw Permalink Normal View History

2020-04-06 16:48:34 +00:00
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright 2013 The Plaso Project Authors.
# Please see the AUTHORS file for details on individual authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Extract search history from a plaso storage file and enjoy a cup of tea.
A very simple script that takes as an input a plaso storage file
and then tries to extract common search engine history from it and spit
it out to your lovely little screen or a file of your choosings.
"""
import argparse
import locale
import logging
import os
import sys
import urllib
# pylint: disable=unused-import
from plaso import filters
from plaso import formatters
from plaso.lib import output
from plaso.lib import storage
# Here we define filters and callback methods for all hits on each filter.
FILTERS = (
(('source is "WEBHIST" and url iregexp "(www.|encrypted.|/)google." and '
'url contains "search"'), 'GoogleSearch'),
('source is "WEBHIST" and url contains "youtube.com"', 'YouTube'),
(('source is "WEBHIST" and url contains "bing.com" and url contains '
'"search"'), 'BingSearch'),
('source is "WEBHIST" and url contains "mail.google.com"', 'Gmail'),
(('source is "WEBHIST" and url contains "yandex.com" and url contains '
'"yandsearch"'), 'Yandex'),
('source is "WEBHIST" and url contains "duckduckgo.com"', 'DuckDuckGo')
)
def ScrubLine(line):
"""Scrub the line of most obvious HTML codes.
An attempt at taking a line and swapping all instances
of %XX which represent a character in hex with it's
unicode character.
Args:
line: The string that we are about to "fix".
Returns:
String that has it's %XX hex codes swapped for text.
"""
if not line:
return ''
if not '%' in line:
return line
try:
return unicode(urllib.unquote(str(line)), 'utf-8')
except UnicodeDecodeError:
logging.warning(u'Unable to decode line: {0:s}'.format(line))
return line
class FilterClass(object):
"""A class that contains all the parser functions."""
@classmethod
def _GetBetweenQEqualsAndAmbersand(cls, string):
"""Return back string that is defined 'q=' and '&'."""
if 'q=' not in string:
return string
_, _, line = string.partition('q=')
before_and, _, _ = line.partition('&')
if not before_and:
return line
return before_and.split()[0]
@classmethod
def _SearchAndQInLine(cls, string):
"""Return a bool indicating if the words q= and search appear in string."""
if 'search' not in string:
return False
if 'q=' not in string:
return False
return True
@classmethod
def GoogleSearch(cls, url):
"""Return back the extracted string."""
if not cls._SearchAndQInLine(url):
return
line = cls._GetBetweenQEqualsAndAmbersand(url)
if not line:
return
return line.replace('+', ' ')
@classmethod
def YouTube(cls, url):
"""Return back the extracted string."""
return cls.GenericSearch(url)
@classmethod
def BingSearch(cls, url):
"""Return back the extracted string."""
return cls.GenericSearch(url)
@classmethod
def GenericSearch(cls, url):
"""Return back the extracted string from a generic search engine."""
if not cls._SearchAndQInLine(url):
return
return cls._GetBetweenQEqualsAndAmbersand(url).replace('+', ' ')
@classmethod
def Yandex(cls, url):
"""Return back the results from Yandex search engine."""
if 'text=' not in url:
return
_, _, line = url.partition('text=')
before_and, _, _ = line.partition('&')
if not before_and:
return
yandex_search_url = before_and.split()[0]
return yandex_search_url.replace('+', ' ')
@classmethod
def DuckDuckGo(cls, url):
"""Return back the extracted string."""
if not 'q=' in url:
return
return cls._GetBetweenQEqualsAndAmbersand(url).replace('+', ' ')
@classmethod
def Gmail(cls, url):
"""Return back the extracted string."""
if 'search/' not in url:
return
_, _, line = url.partition('search/')
first, _, _ = line.partition('/')
second, _, _ = first.partition('?compose')
return second.replace('+', ' ')
def Main():
"""Run the tool."""
arg_parser = argparse.ArgumentParser(
description=(
'plaso_extract_search_history is a simple script that reads the '
'content of a plaso storage file and tries to extract known search '
'engine history from it'))
arg_parser.add_argument(
'-w', '--write', metavar='FILENAME', action='store', dest='output_file',
default='', help='Write results to a file.')
arg_parser.add_argument(
'filename', action='store', metavar='STORAGE_FILE', help=(
'The path to the plaso storage file.'))
options = arg_parser.parse_args()
preferred_encoding = locale.getpreferredencoding()
if preferred_encoding.lower() == 'ascii':
preferred_encoding = 'utf-8'
if not os.path.isfile(options.filename):
raise RuntimeError(u'File {} does not exist'.format(options.filename))
results = {}
result_count = {}
output_filehandle = output.OutputFilehandle(preferred_encoding)
if options.output_file:
output_filehandle.Open(path=options.output_file)
else:
output_filehandle.Open(sys.stdout)
# Build filters.
filter_dict = {}
for filter_str, call_back in FILTERS:
filter_obj = filters.GetFilter(filter_str)
call_back_obj = getattr(FilterClass, call_back, None)
results[call_back] = []
if filter_obj and call_back_obj:
filter_dict[filter_obj] = (call_back, call_back_obj)
with storage.StorageFile(options.filename, read_only=True) as store:
event_object = store.GetSortedEntry()
while event_object:
for filter_obj, call_backs in filter_dict.items():
call_back_name, call_back_object = call_backs
if filter_obj.Match(event_object):
url_attribute = getattr(event_object, 'url', None)
if not url_attribute:
continue
ret_line = ScrubLine(call_back_object(url_attribute))
if not ret_line:
continue
if ret_line in results[call_back_name]:
result_count[u'{}:{}'.format(call_back_name, ret_line)] += 1
else:
results[call_back_name].append(ret_line)
result_count[u'{}:{}'.format(call_back_name, ret_line)] = 1
event_object = store.GetSortedEntry()
for engine_name, result_list in results.items():
results_with_count = []
for result in result_list:
results_with_count.append((
result_count[u'{}:{}'.format(engine_name, result)], result))
header = u' == ENGINE: {0:s} ==\n'.format(engine_name)
output_filehandle.WriteLine(header)
for count, result in sorted(results_with_count, reverse=True):
line = u'{} {}\n'.format(count, result)
output_filehandle.WriteLine(line)
output_filehandle.WriteLine('\n')
if __name__ == '__main__':
Main()