750 lines
24 KiB
Python
750 lines
24 KiB
Python
#!/usr/bin/python
|
||
# -*- coding: utf-8 -*-
|
||
#
|
||
# Copyright 2013 The Plaso Project Authors.
|
||
# Please see the AUTHORS file for details on individual authors.
|
||
#
|
||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||
# you may not use this file except in compliance with the License.
|
||
# You may obtain a copy of the License at
|
||
#
|
||
# http://www.apache.org/licenses/LICENSE-2.0
|
||
#
|
||
# Unless required by applicable law or agreed to in writing, software
|
||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
# See the License for the specific language governing permissions and
|
||
# limitations under the License.
|
||
"""This file contains the classes for a scan tree-based format scanner."""
|
||
|
||
import logging
|
||
import os
|
||
|
||
from plaso.classifier import patterns
|
||
from plaso.classifier import range_list
|
||
from plaso.classifier import scan_tree
|
||
|
||
|
||
class _ScanMatch(object):
|
||
"""Class that implements a scan match."""
|
||
|
||
def __init__(self, total_data_offset, pattern):
|
||
"""Initializes the scan result.
|
||
|
||
Args:
|
||
total_data_offset: the offset of the resulting match relative
|
||
to the start of the total data scanned.
|
||
pattern: the pattern matched.
|
||
"""
|
||
super(_ScanMatch, self).__init__()
|
||
self.total_data_offset = total_data_offset
|
||
self.pattern = pattern
|
||
|
||
@property
|
||
def specification(self):
|
||
"""The specification."""
|
||
return self.pattern.specification
|
||
|
||
|
||
class _ScanResult(object):
|
||
"""Class that implements a scan result."""
|
||
|
||
def __init__(self, specification):
|
||
"""Initializes the scan result.
|
||
|
||
Args:
|
||
scan_tree_node: the corresponding scan tree node or None.
|
||
"""
|
||
super(_ScanResult, self).__init__()
|
||
self.specification = specification
|
||
self.scan_matches = []
|
||
|
||
@property
|
||
def identifier(self):
|
||
"""The specification identifier."""
|
||
return self.specification.identifier
|
||
|
||
|
||
class ScanState(object):
|
||
"""Class that implements a scan state."""
|
||
|
||
# The state definitions.
|
||
_SCAN_STATE_START = 1
|
||
_SCAN_STATE_SCANNING = 2
|
||
_SCAN_STATE_STOP = 3
|
||
|
||
def __init__(self, scan_tree_node, total_data_size=None):
|
||
"""Initializes the scan state.
|
||
|
||
Args:
|
||
scan_tree_node: the corresponding scan tree node or None.
|
||
total_data_size: optional value to indicate the total data size.
|
||
The default is None.
|
||
"""
|
||
super(ScanState, self).__init__()
|
||
self._matches = []
|
||
self.remaining_data = None
|
||
self.remaining_data_size = 0
|
||
self.scan_tree_node = scan_tree_node
|
||
self.state = self._SCAN_STATE_START
|
||
self.total_data_offset = 0
|
||
self.total_data_size = total_data_size
|
||
|
||
def AddMatch(self, total_data_offset, pattern):
|
||
"""Adds a result to the state to scanning.
|
||
|
||
Args:
|
||
total_data_offset: the offset of the resulting match relative
|
||
to the start total data scanned.
|
||
pattern: the pattern matched.
|
||
|
||
Raises:
|
||
RuntimeError: when a unsupported state is encountered.
|
||
"""
|
||
if (self.state != self._SCAN_STATE_START and
|
||
self.state != self._SCAN_STATE_SCANNING):
|
||
raise RuntimeError(u'Unsupported scan state.')
|
||
|
||
self._matches.append(_ScanMatch(total_data_offset, pattern))
|
||
|
||
def GetMatches(self):
|
||
"""Retrieves a list containing the results.
|
||
|
||
Returns:
|
||
A list of scan matches (instances of _ScanMatch).
|
||
|
||
Raises:
|
||
RuntimeError: when a unsupported state is encountered.
|
||
"""
|
||
if self.state != self._SCAN_STATE_STOP:
|
||
raise RuntimeError(u'Unsupported scan state.')
|
||
|
||
return self._matches
|
||
|
||
def Reset(self, scan_tree_node):
|
||
"""Resets the state to start.
|
||
|
||
This function will clear the remaining data.
|
||
|
||
Args:
|
||
scan_tree_node: the corresponding scan tree node or None.
|
||
|
||
Raises:
|
||
RuntimeError: when a unsupported state is encountered.
|
||
"""
|
||
if self.state != self._SCAN_STATE_STOP:
|
||
raise RuntimeError(u'Unsupported scan state.')
|
||
|
||
self.remaining_data = None
|
||
self.remaining_data_size = 0
|
||
self.scan_tree_node = scan_tree_node
|
||
self.state = self._SCAN_STATE_START
|
||
|
||
def Scanning(self, scan_tree_node, total_data_offset):
|
||
"""Sets the state to scanning.
|
||
|
||
Args:
|
||
scan_tree_node: the active scan tree node.
|
||
total_data_offset: the offset of the resulting match relative
|
||
to the start of the total data scanned.
|
||
|
||
Raises:
|
||
RuntimeError: when a unsupported state is encountered.
|
||
"""
|
||
if (self.state != self._SCAN_STATE_START and
|
||
self.state != self._SCAN_STATE_SCANNING):
|
||
raise RuntimeError(u'Unsupported scan state.')
|
||
|
||
self.scan_tree_node = scan_tree_node
|
||
self.state = self._SCAN_STATE_SCANNING
|
||
self.total_data_offset = total_data_offset
|
||
|
||
def Stop(self):
|
||
"""Sets the state to stop.
|
||
|
||
Raises:
|
||
RuntimeError: when a unsupported state is encountered.
|
||
"""
|
||
if (self.state != self._SCAN_STATE_START and
|
||
self.state != self._SCAN_STATE_SCANNING):
|
||
raise RuntimeError(u'Unsupported scan state.')
|
||
|
||
self.scan_tree_node = None
|
||
self.state = self._SCAN_STATE_STOP
|
||
|
||
|
||
class ScanTreeScannerBase(object):
|
||
"""Class that implements a scan tree-based scanner base."""
|
||
|
||
def __init__(self, specification_store):
|
||
"""Initializes the scanner.
|
||
|
||
Args:
|
||
specification_store: the specification store (instance of
|
||
SpecificationStore) that contains the format
|
||
specifications.
|
||
"""
|
||
super(ScanTreeScannerBase, self).__init__()
|
||
self._scan_tree = None
|
||
self._specification_store = specification_store
|
||
|
||
def _ScanBufferScanState(
|
||
self, scan_tree_object, scan_state, data, data_size, total_data_offset,
|
||
total_data_size=None):
|
||
"""Scans a buffer using the scan tree.
|
||
|
||
This function implements a Boyer–Moore–Horspool equivalent approach
|
||
in combination with the scan tree.
|
||
|
||
Args:
|
||
scan_tree_object: the scan tree (instance of ScanTree).
|
||
scan_state: the scan state (instance of ScanState).
|
||
data: a buffer containing raw data.
|
||
data_size: the size of the raw data in the buffer.
|
||
total_data_offset: the offset of the data relative to the start of
|
||
the total data scanned.
|
||
total_data_size: optional value to indicate the total data size.
|
||
The default is None.
|
||
|
||
Raises:
|
||
RuntimeError: if the total data offset, total data size or the last
|
||
pattern offset value is out of bounds
|
||
"""
|
||
if total_data_size is not None and total_data_size < 0:
|
||
raise RuntimeError(u'Invalid total data size, value out of bounds.')
|
||
|
||
if total_data_offset < 0 or (
|
||
total_data_size is not None and total_data_offset >= total_data_size):
|
||
raise RuntimeError(u'Invalid total data offset, value out of bounds.')
|
||
|
||
data_offset = 0
|
||
scan_tree_node = scan_state.scan_tree_node
|
||
|
||
if scan_state.remaining_data:
|
||
# str.join() should be more efficient then concatenation by +.
|
||
data = ''.join([scan_state.remaining_data, data])
|
||
data_size += scan_state.remaining_data_size
|
||
|
||
scan_state.remaining_data = None
|
||
scan_state.remaining_data_size = 0
|
||
|
||
if (total_data_size is not None and
|
||
total_data_offset + data_size >= total_data_size):
|
||
match_on_boundary = True
|
||
else:
|
||
match_on_boundary = False
|
||
|
||
while data_offset < data_size:
|
||
if (not match_on_boundary and
|
||
data_offset + scan_tree_object.largest_length >= data_size):
|
||
break
|
||
|
||
found_match = False
|
||
scan_done = False
|
||
|
||
while not scan_done:
|
||
scan_object = scan_tree_node.CompareByteValue(
|
||
data, data_offset, data_size, total_data_offset,
|
||
total_data_size=total_data_size)
|
||
|
||
if isinstance(scan_object, scan_tree.ScanTreeNode):
|
||
scan_tree_node = scan_object
|
||
else:
|
||
scan_done = True
|
||
|
||
if isinstance(scan_object, patterns.Pattern):
|
||
pattern_length = len(scan_object.signature.expression)
|
||
data_last_offset = data_offset + pattern_length
|
||
|
||
if cmp(scan_object.signature.expression,
|
||
data[data_offset:data_last_offset]) == 0:
|
||
|
||
if (not scan_object.signature.is_bound or
|
||
scan_object.signature.offset == data_offset):
|
||
found_match = True
|
||
|
||
logging.debug(
|
||
u'Signature match at data offset: 0x{0:08x}.'.format(
|
||
data_offset))
|
||
|
||
scan_state.AddMatch(total_data_offset + data_offset, scan_object)
|
||
|
||
if found_match:
|
||
skip_value = len(scan_object.signature.expression)
|
||
scan_tree_node = scan_tree_object.root_node
|
||
else:
|
||
last_pattern_offset = (
|
||
scan_tree_object.skip_table.skip_pattern_length - 1)
|
||
|
||
if data_offset + last_pattern_offset >= data_size:
|
||
raise RuntimeError(
|
||
u'Invalid last pattern offset, value out of bounds.')
|
||
skip_value = 0
|
||
|
||
while last_pattern_offset >= 0 and not skip_value:
|
||
last_data_offset = data_offset + last_pattern_offset
|
||
byte_value = ord(data[last_data_offset])
|
||
skip_value = scan_tree_object.skip_table[byte_value]
|
||
last_pattern_offset -= 1
|
||
|
||
if not skip_value:
|
||
skip_value = 1
|
||
|
||
scan_tree_node = scan_tree_object.root_node
|
||
|
||
data_offset += skip_value
|
||
|
||
if not match_on_boundary and data_offset < data_size:
|
||
scan_state.remaining_data = data[data_offset:data_size]
|
||
scan_state.remaining_data_size = data_size - data_offset
|
||
|
||
scan_state.Scanning(scan_tree_node, total_data_offset + data_offset)
|
||
|
||
def _ScanBufferScanStateFinal(self, scan_tree_object, scan_state):
|
||
"""Scans the remaining data in the scan state using the scan tree.
|
||
|
||
Args:
|
||
scan_tree_object: the scan tree (instance of ScanTree).
|
||
scan_state: the scan state (instance of ScanState).
|
||
"""
|
||
if scan_state.remaining_data:
|
||
data = scan_state.remaining_data
|
||
data_size = scan_state.remaining_data_size
|
||
|
||
scan_state.remaining_data = None
|
||
scan_state.remaining_data_size = 0
|
||
|
||
# Setting the total data size will make boundary matches are returned
|
||
# in this scanning pass.
|
||
total_data_size = scan_state.total_data_size
|
||
if total_data_size is None:
|
||
total_data_size = scan_state.total_data_offset + data_size
|
||
|
||
self._ScanBufferScanState(
|
||
scan_tree_object, scan_state, data, data_size,
|
||
scan_state.total_data_offset, total_data_size=total_data_size)
|
||
|
||
scan_state.Stop()
|
||
|
||
def GetScanResults(self, scan_state):
|
||
"""Retrieves the scan results.
|
||
|
||
Args:
|
||
scan_state: the scan state (instance of ScanState).
|
||
|
||
Return:
|
||
A list of scan results (instances of _ScanResult).
|
||
"""
|
||
scan_results = {}
|
||
|
||
for scan_match in scan_state.GetMatches():
|
||
specification = scan_match.specification
|
||
identifier = specification.identifier
|
||
|
||
logging.debug(
|
||
u'Scan match at offset: 0x{0:08x} specification: {1:s}'.format(
|
||
scan_match.total_data_offset, identifier))
|
||
|
||
if identifier not in scan_results:
|
||
scan_results[identifier] = _ScanResult(specification)
|
||
|
||
scan_results[identifier].scan_matches.append(scan_match)
|
||
|
||
return scan_results.values()
|
||
|
||
|
||
class Scanner(ScanTreeScannerBase):
|
||
"""Class that implements a scan tree-based scanner."""
|
||
|
||
_READ_BUFFER_SIZE = 512
|
||
|
||
def __init__(self, specification_store):
|
||
"""Initializes the scanner.
|
||
|
||
Args:
|
||
specification_store: the specification store (instance of
|
||
SpecificationStore) that contains the format
|
||
specifications.
|
||
"""
|
||
super(Scanner, self).__init__(specification_store)
|
||
|
||
def ScanBuffer(self, scan_state, data, data_size):
|
||
"""Scans a buffer.
|
||
|
||
Args:
|
||
scan_state: the scan state (instance of ScanState).
|
||
data: a buffer containing raw data.
|
||
data_size: the size of the raw data in the buffer.
|
||
"""
|
||
self._ScanBufferScanState(
|
||
self._scan_tree, scan_state, data, data_size,
|
||
scan_state.total_data_offset,
|
||
total_data_size=scan_state.total_data_size)
|
||
|
||
def ScanFileObject(self, file_object):
|
||
"""Scans a file-like object.
|
||
|
||
Args:
|
||
file_object: a file-like object.
|
||
|
||
Returns:
|
||
A list of scan results (instances of ScanResult).
|
||
"""
|
||
file_offset = 0
|
||
|
||
if hasattr(file_object, 'get_size'):
|
||
file_size = file_object.get_size()
|
||
else:
|
||
file_object.seek(0, os.SEEK_END)
|
||
file_size = file_object.tell()
|
||
|
||
scan_state = self.StartScan(total_data_size=file_size)
|
||
|
||
file_object.seek(file_offset, os.SEEK_SET)
|
||
|
||
while file_offset < file_size:
|
||
data = file_object.read(self._READ_BUFFER_SIZE)
|
||
data_size = len(data)
|
||
|
||
if data_size == 0:
|
||
break
|
||
|
||
self._ScanBufferScanState(
|
||
self._scan_tree, scan_state, data, data_size, file_offset,
|
||
total_data_size=file_size)
|
||
|
||
file_offset += data_size
|
||
|
||
self.StopScan(scan_state)
|
||
|
||
return self.GetScanResults(scan_state)
|
||
|
||
def StartScan(self, total_data_size=None):
|
||
"""Starts a scan.
|
||
|
||
The function sets up the scanning related structures if necessary.
|
||
|
||
Args:
|
||
total_data_size: optional value to indicate the total data size.
|
||
The default is None.
|
||
Returns:
|
||
A scan state (instance of ScanState).
|
||
|
||
Raises:
|
||
RuntimeError: when total data size is invalid.
|
||
"""
|
||
if total_data_size is not None and total_data_size < 0:
|
||
raise RuntimeError(u'Invalid total data size.')
|
||
|
||
if self._scan_tree is None:
|
||
self._scan_tree = scan_tree.ScanTree(
|
||
self._specification_store, None)
|
||
|
||
return ScanState(self._scan_tree.root_node, total_data_size=total_data_size)
|
||
|
||
def StopScan(self, scan_state):
|
||
"""Stops a scan.
|
||
|
||
Args:
|
||
scan_state: the scan state (instance of ScanState).
|
||
"""
|
||
self._ScanBufferScanStateFinal(self._scan_tree, scan_state)
|
||
|
||
|
||
class OffsetBoundScanner(ScanTreeScannerBase):
|
||
"""Class that implements an offset-bound scan tree-based scanner."""
|
||
|
||
_READ_BUFFER_SIZE = 512
|
||
|
||
def __init__(self, specification_store):
|
||
"""Initializes the scanner.
|
||
|
||
Args:
|
||
specification_store: the specification store (instance of
|
||
SpecificationStore) that contains the format
|
||
specifications.
|
||
"""
|
||
super(OffsetBoundScanner, self).__init__(specification_store)
|
||
self._footer_scan_tree = None
|
||
self._footer_spanning_range = None
|
||
self._header_scan_tree = None
|
||
self._header_spanning_range = None
|
||
|
||
def _GetFooterRange(self, total_data_size):
|
||
"""Retrieves the read buffer aligned footer range.
|
||
|
||
Args:
|
||
total_data_size: optional value to indicate the total data size.
|
||
The default is None.
|
||
Returns:
|
||
A range (instance of Range).
|
||
"""
|
||
# The actual footer range is in reverse since the spanning footer range
|
||
# is based on positive offsets, where 0 is the end of file.
|
||
if self._footer_spanning_range.end_offset < total_data_size:
|
||
footer_range_start_offset = (
|
||
total_data_size - self._footer_spanning_range.end_offset)
|
||
else:
|
||
footer_range_start_offset = 0
|
||
|
||
# Calculate the lower bound modulus of the footer range start offset
|
||
# in increments of the read buffer size.
|
||
footer_range_start_offset /= self._READ_BUFFER_SIZE
|
||
footer_range_start_offset *= self._READ_BUFFER_SIZE
|
||
|
||
# Calculate the upper bound modulus of the footer range size
|
||
# in increments of the read buffer size.
|
||
footer_range_size = self._footer_spanning_range.size
|
||
remainder = footer_range_size % self._READ_BUFFER_SIZE
|
||
footer_range_size /= self._READ_BUFFER_SIZE
|
||
|
||
if remainder > 0:
|
||
footer_range_size += 1
|
||
|
||
footer_range_size *= self._READ_BUFFER_SIZE
|
||
|
||
return range_list.Range(footer_range_start_offset, footer_range_size)
|
||
|
||
def _GetHeaderRange(self):
|
||
"""Retrieves the read buffer aligned header range.
|
||
|
||
Returns:
|
||
A range (instance of Range).
|
||
"""
|
||
# Calculate the lower bound modulus of the header range start offset
|
||
# in increments of the read buffer size.
|
||
header_range_start_offset = self._header_spanning_range.start_offset
|
||
header_range_start_offset /= self._READ_BUFFER_SIZE
|
||
header_range_start_offset *= self._READ_BUFFER_SIZE
|
||
|
||
# Calculate the upper bound modulus of the header range size
|
||
# in increments of the read buffer size.
|
||
header_range_size = self._header_spanning_range.size
|
||
remainder = header_range_size % self._READ_BUFFER_SIZE
|
||
header_range_size /= self._READ_BUFFER_SIZE
|
||
|
||
if remainder > 0:
|
||
header_range_size += 1
|
||
|
||
header_range_size *= self._READ_BUFFER_SIZE
|
||
|
||
return range_list.Range(header_range_start_offset, header_range_size)
|
||
|
||
def _ScanBufferScanState(
|
||
self, scan_tree_object, scan_state, data, data_size, total_data_offset,
|
||
total_data_size=None):
|
||
"""Scans a buffer using the scan tree.
|
||
|
||
This function implements a Boyer–Moore–Horspool equivalent approach
|
||
in combination with the scan tree.
|
||
|
||
Args:
|
||
scan_tree_object: the scan tree (instance of ScanTree).
|
||
scan_state: the scan state (instance of ScanState).
|
||
data: a buffer containing raw data.
|
||
data_size: the size of the raw data in the buffer.
|
||
total_data_offset: the offset of the data relative to the start of
|
||
the total data scanned.
|
||
total_data_size: optional value to indicate the total data size.
|
||
The default is None.
|
||
"""
|
||
scan_done = False
|
||
scan_tree_node = scan_tree_object.root_node
|
||
|
||
while not scan_done:
|
||
data_offset = 0
|
||
|
||
scan_object = scan_tree_node.CompareByteValue(
|
||
data, data_offset, data_size, total_data_offset,
|
||
total_data_size=total_data_size)
|
||
|
||
if isinstance(scan_object, scan_tree.ScanTreeNode):
|
||
scan_tree_node = scan_object
|
||
else:
|
||
scan_done = True
|
||
|
||
if isinstance(scan_object, patterns.Pattern):
|
||
pattern_length = len(scan_object.signature.expression)
|
||
pattern_start_offset = scan_object.signature.offset
|
||
pattern_end_offset = pattern_start_offset + pattern_length
|
||
|
||
if cmp(scan_object.signature.expression,
|
||
data[pattern_start_offset:pattern_end_offset]) == 0:
|
||
scan_state.AddMatch(
|
||
total_data_offset + scan_object.signature.offset, scan_object)
|
||
|
||
logging.debug(
|
||
u'Signature match at data offset: 0x{0:08x}.'.format(data_offset))
|
||
|
||
# TODO: implement.
|
||
# def ScanBuffer(self, scan_state, data, data_size):
|
||
# """Scans a buffer.
|
||
|
||
# Args:
|
||
# scan_state: the scan state (instance of ScanState).
|
||
# data: a buffer containing raw data.
|
||
# data_size: the size of the raw data in the buffer.
|
||
# """
|
||
# # TODO: fix footer scanning logic.
|
||
# # need to know the file size here for the footers.
|
||
|
||
# # TODO: check for clashing ranges?
|
||
|
||
# header_range = self._GetHeaderRange()
|
||
# footer_range = self._GetFooterRange(scan_state.total_data_size)
|
||
|
||
# if self._scan_tree == self._header_scan_tree:
|
||
# if (scan_state.total_data_offset >= header_range.start_offset and
|
||
# scan_state.total_data_offset < header_range.end_offset):
|
||
# self._ScanBufferScanState(
|
||
# self._scan_tree, scan_state, data, data_size,
|
||
# scan_state.total_data_offset,
|
||
# total_data_size=scan_state.total_data_size)
|
||
|
||
# elif scan_state.total_data_offset > header_range.end_offset:
|
||
# # TODO: implement.
|
||
# pass
|
||
|
||
# if self._scan_tree == self._footer_scan_tree:
|
||
# if (scan_state.total_data_offset >= footer_range.start_offset and
|
||
# scan_state.total_data_offset < footer_range.end_offset):
|
||
# self._ScanBufferScanState(
|
||
# self._scan_tree, scan_state, data, data_size,
|
||
# scan_state.total_data_offset,
|
||
# total_data_size=scan_state.total_data_size)
|
||
|
||
def ScanFileObject(self, file_object):
|
||
"""Scans a file-like object.
|
||
|
||
Args:
|
||
file_object: a file-like object.
|
||
|
||
Returns:
|
||
A scan state (instance of ScanState).
|
||
"""
|
||
# TODO: add support for fixed size block-based reads.
|
||
|
||
if hasattr(file_object, 'get_size'):
|
||
file_size = file_object.get_size()
|
||
else:
|
||
file_object.seek(0, os.SEEK_END)
|
||
file_size = file_object.tell()
|
||
|
||
file_offset = 0
|
||
scan_state = self.StartScan(total_data_size=file_size)
|
||
|
||
if self._header_scan_tree.root_node is not None:
|
||
header_range = self._GetHeaderRange()
|
||
|
||
# TODO: optimize the read by supporting fixed size block-based reads.
|
||
# if file_offset < header_range.start_offset:
|
||
# file_offset = header_range.start_offset
|
||
|
||
file_object.seek(file_offset, os.SEEK_SET)
|
||
|
||
# TODO: optimize the read by supporting fixed size block-based reads.
|
||
# data = file_object.read(header_range.size)
|
||
data = file_object.read(header_range.end_offset)
|
||
data_size = len(data)
|
||
|
||
if data_size > 0:
|
||
self._ScanBufferScanState(
|
||
self._scan_tree, scan_state, data, data_size, file_offset,
|
||
total_data_size=file_size)
|
||
|
||
file_offset += data_size
|
||
|
||
if self._footer_scan_tree.root_node is not None:
|
||
self.StopScan(scan_state)
|
||
|
||
self._scan_tree = self._footer_scan_tree
|
||
scan_state.Reset(self._scan_tree.root_node)
|
||
|
||
if self._footer_scan_tree.root_node is not None:
|
||
footer_range = self._GetFooterRange(file_size)
|
||
|
||
# Note that the offset in the footer scan tree start with 0. Make sure
|
||
# the data offset of the data being scanned is aligned with the offset
|
||
# in the scan tree.
|
||
if footer_range.start_offset < self._footer_spanning_range.end_offset:
|
||
data_offset = (
|
||
self._footer_spanning_range.end_offset - footer_range.start_offset)
|
||
else:
|
||
data_offset = 0
|
||
|
||
if file_offset < footer_range.start_offset:
|
||
file_offset = footer_range.start_offset
|
||
|
||
file_object.seek(file_offset, os.SEEK_SET)
|
||
|
||
data = file_object.read(self._READ_BUFFER_SIZE)
|
||
data_size = len(data)
|
||
|
||
if data_size > 0:
|
||
self._ScanBufferScanState(
|
||
self._scan_tree, scan_state, data[data_offset:],
|
||
data_size - data_offset, file_offset + data_offset,
|
||
total_data_size=file_size)
|
||
|
||
self.StopScan(scan_state)
|
||
|
||
return self.GetScanResults(scan_state)
|
||
|
||
def StartScan(self, total_data_size=None):
|
||
"""Starts a scan.
|
||
|
||
The function sets up the scanning related structures if necessary.
|
||
|
||
Args:
|
||
total_data_size: optional value to indicate the total data size.
|
||
The default is None.
|
||
Returns:
|
||
A list of scan results (instances of ScanResult).
|
||
|
||
Raises:
|
||
RuntimeError: when total data size is invalid.
|
||
"""
|
||
if total_data_size is None or total_data_size < 0:
|
||
raise RuntimeError(u'Invalid total data size.')
|
||
|
||
if self._header_scan_tree is None:
|
||
self._header_scan_tree = scan_tree.ScanTree(
|
||
self._specification_store, True,
|
||
offset_mode=scan_tree.ScanTree.OFFSET_MODE_POSITIVE)
|
||
|
||
if self._header_spanning_range is None:
|
||
spanning_range = self._header_scan_tree.range_list.GetSpanningRange()
|
||
self._header_spanning_range = spanning_range
|
||
|
||
if self._footer_scan_tree is None:
|
||
self._footer_scan_tree = scan_tree.ScanTree(
|
||
self._specification_store, True,
|
||
offset_mode=scan_tree.ScanTree.OFFSET_MODE_NEGATIVE)
|
||
|
||
if self._footer_spanning_range is None:
|
||
spanning_range = self._footer_scan_tree.range_list.GetSpanningRange()
|
||
self._footer_spanning_range = spanning_range
|
||
|
||
if self._header_scan_tree.root_node is not None:
|
||
self._scan_tree = self._header_scan_tree
|
||
elif self._footer_scan_tree.root_node is not None:
|
||
self._scan_tree = self._footer_scan_tree
|
||
else:
|
||
self._scan_tree = None
|
||
|
||
if self._scan_tree is not None:
|
||
root_node = self._scan_tree.root_node
|
||
else:
|
||
root_node = None
|
||
|
||
return ScanState(root_node, total_data_size=total_data_size)
|
||
|
||
def StopScan(self, scan_state):
|
||
"""Stops a scan.
|
||
|
||
Args:
|
||
scan_state: the scan state (instance of ScanState).
|
||
"""
|
||
self._ScanBufferScanStateFinal(self._scan_tree, scan_state)
|
||
self._scan_tree = None
|