plaso-rubanetra/plaso/engine/classifier.py
2020-04-06 18:48:34 +02:00

203 lines
6.6 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright 2014 The Plaso Project Authors.
# Please see the AUTHORS file for details on individual authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The file format classifier."""
# TODO: rewrite most of the classifier in C and integrate with the code in:
# plaso/classifier
import gzip
import logging
import os
import tarfile
import zipfile
import zlib
from dfvfs.lib import definitions
from dfvfs.path import factory as path_spec_factory
from dfvfs.resolver import resolver as path_spec_resolver
from plaso.lib import errors
class Classifier(object):
"""Class that defines the file format classifier."""
_MAGIC_VALUES = {
'ZIP': {'length': 4, 'offset': 0, 'values': ['P', 'K', '\x03', '\x04']},
'TAR': {'length': 5, 'offset': 257, 'values': ['u', 's', 't', 'a', 'r']},
'GZ': {'length': 2, 'offset': 0, 'values': ['\x1f', '\x8b']},
}
# TODO: Remove this logic when the classifier is ready.
# This is only used temporary until files can be classified.
magic_max_length = 0
# Defines the maximum depth into a file (for SmartOpenFiles).
MAX_FILE_DEPTH = 3
@classmethod
def _SmartOpenFile(cls, file_entry):
"""Return a generator for all pathspec protobufs extracted from a file.
If the file is compressed then extract all members and include
them into the processing queue.
Args:
file_entry: The file entry object.
Yields:
A path specification (instance of dfvfs.PathSpec) of embedded file
entries.
"""
file_object = file_entry.GetFileObject()
# TODO: Remove when classifier gets deployed. Then we
# call the classifier here and use that for definition (and
# then we forward the classifier definition in the pathspec
# protobuf.
file_object.seek(0, os.SEEK_SET)
if not cls.magic_max_length:
for magic_value in cls._MAGIC_VALUES.values():
cls.magic_max_length = max(
cls.magic_max_length,
magic_value['length'] + magic_value['offset'])
header = file_object.read(cls.magic_max_length)
file_classification = ''
# Go over each and every magic value defined and compare
# each read byte (according to original offset and current one)
# If all match, then we have a particular file format and we
# can move on.
for m_value, m_dict in cls._MAGIC_VALUES.items():
length = m_dict['length'] + m_dict['offset']
if len(header) < length:
continue
offset = m_dict['offset']
magic = m_dict['values']
if header[offset:offset + len(magic)] == ''.join(magic):
file_classification = m_value
break
# TODO: refactor the file type specific code into sub functions.
if file_classification == 'ZIP':
try:
file_object.seek(0, os.SEEK_SET)
zip_file = zipfile.ZipFile(file_object, 'r')
# TODO: Make this is a more "sane" check, and perhaps
# not entirely skip the file if it has this particular
# ending, but for now, this both slows the tool down
# considerably and makes it also more unstable.
_, _, filename_extension = file_entry.name.rpartition(u'.')
if filename_extension in [u'.jar', u'.sym', u'.xpi']:
file_object.close()
logging.debug(
u'Unsupported ZIP sub type: {0:s} detected in file: {1:s}'.format(
filename_extension, file_entry.path_spec.comparable))
return
for info in zip_file.infolist():
if info.file_size > 0:
logging.debug(
u'Including: {0:s} from ZIP into process queue.'.format(
info.filename))
yield path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_ZIP, location=info.filename,
parent=file_entry.path_spec)
except zipfile.BadZipfile:
pass
elif file_classification == 'GZ':
try:
type_indicator = file_entry.path_spec.type_indicator
if type_indicator == definitions.TYPE_INDICATOR_GZIP:
raise errors.SameFileType
file_object.seek(0, os.SEEK_SET)
gzip_file = gzip.GzipFile(fileobj=file_object, mode='rb')
_ = gzip_file.read(4)
gzip_file.close()
logging.debug((
u'Including: {0:s} as GZIP compressed stream into process '
u'queue.').format(file_entry.name))
yield path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_GZIP, parent=file_entry.path_spec)
except (IOError, zlib.error, errors.SameFileType):
pass
# TODO: Add BZ2 support.
elif file_classification == 'TAR':
try:
file_object.seek(0, os.SEEK_SET)
tar_file = tarfile.open(fileobj=file_object, mode='r')
for name_info in tar_file.getmembers():
if not name_info.isfile():
continue
name = name_info.path
logging.debug(
u'Including: {0:s} from TAR into process queue.'.format(name))
yield path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_TAR, location=name,
parent=file_entry.path_spec)
except tarfile.ReadError:
pass
file_object.close()
@classmethod
def SmartOpenFiles(cls, file_entry, depth=0):
"""Generate a list of all available PathSpecs extracted from a file.
Args:
file_entry: A file entry object.
depth: Incrementing number that defines the current depth into
a file (file inside a ZIP file is depth 1, file inside a tar.gz
would be of depth 2).
Yields:
A file entry object (instance of dfvfs.FileEntry).
"""
if depth >= cls.MAX_FILE_DEPTH:
return
for path_spec in cls._SmartOpenFile(file_entry):
sub_file_entry = path_spec_resolver.Resolver.OpenFileEntry(path_spec)
if sub_file_entry is None:
logging.debug(
u'Unable to open file: {0:s}'.format(path_spec.comparable))
continue
yield sub_file_entry
depth += 1
for sub_file_entry in cls.SmartOpenFiles(sub_file_entry, depth=depth):
yield sub_file_entry