1565 lines
53 KiB
Python
1565 lines
53 KiB
Python
#!/usr/bin/python
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# Copyright 2012 The Plaso Project Authors.
|
|
# Please see the AUTHORS file for details on individual authors.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""The storage mechanism.
|
|
|
|
The storage mechanism can be described as a collection of storage files
|
|
that are stored together in a single ZIP compressed container.
|
|
|
|
The storage file is essentially split up in two categories:
|
|
+ A store file (further described below).
|
|
+ Other files, these contain grouping information, tag, collection
|
|
information or other metadata describing the content of the store files.
|
|
|
|
The store itself is a collection of four files:
|
|
plaso_meta.<store_number>
|
|
plaso_proto.<store_number>
|
|
plaso_index.<store_number>
|
|
plaso_timestamps.<store_number>
|
|
|
|
The plaso_proto file within each store contains several serialized EventObjects
|
|
or events that are serialized (as a protobuf). All of the EventObjects within
|
|
the plaso_proto file are fully sorted based on time however since the storage
|
|
container can contain more than one store the overall storage is not fully
|
|
sorted.
|
|
|
|
The other files that make up the store are:
|
|
|
|
+ plaso_meta
|
|
|
|
Simple text file using YAML for storing metadata information about the store.
|
|
definition, example:
|
|
variable: value
|
|
a_list: [value, value, value]
|
|
|
|
This can be used to filter out which proto files should be included
|
|
in processing.
|
|
|
|
+ plaso_index
|
|
|
|
The index file contains an index to all the entries stored within
|
|
the protobuf file, so that it can be easily seeked. The layout is:
|
|
|
|
+-----+-----+-...+
|
|
| int | int | ...|
|
|
+-----+-----+-...+
|
|
|
|
Where int is an unsigned integer '<I' that represents the byte offset
|
|
into the .proto file where the beginning of the size variable lies.
|
|
|
|
This can be used to seek the proto file directly to read a particular
|
|
entry within the proto file.
|
|
|
|
+ plaso_timestamps
|
|
|
|
This is a simple file that contains all the timestamp values of the entries
|
|
within the proto file. Each entry is a a long int ('<q') that contains the value
|
|
of the EventObject of that timestamps index into the file.
|
|
|
|
The structure is:
|
|
+-----------+-----------+-...-+
|
|
| timestamp | timestamp | ... |
|
|
+-----------+-----------+-...-+
|
|
|
|
This is used for time based filtering, where if the 15th entry in this file is
|
|
the first entry that is larger than the lower bound, then the index file is used
|
|
to seek to the 15th entry inside the proto file.
|
|
|
|
+ plaso_proto
|
|
|
|
The structure of a proto file is:
|
|
+------+---------------------------------+------+------...+
|
|
| size | protobuf (plaso_storage_proto) | size | proto...|
|
|
+------+---------------------------------+------+------...+
|
|
|
|
For further details about the storage design see:
|
|
http://plaso.kiddaland.net/developer/libraries/storage
|
|
"""
|
|
# TODO: Go through the storage library to see if it can be split in two, one
|
|
# part which will define the storage itself, and can be relatively independent.
|
|
# Independent enough to be split into separate project to ease integration by
|
|
# other tools. This file will then contain the queueing mechanism and other
|
|
# plaso specific mechanism, making it easier to import the storage library.
|
|
|
|
import collections
|
|
import construct
|
|
import heapq
|
|
import logging
|
|
# TODO: replace all instances of struct by construct!
|
|
import struct
|
|
import sys
|
|
import zipfile
|
|
|
|
from google.protobuf import message
|
|
import yaml
|
|
|
|
from plaso.engine import queue
|
|
from plaso.lib import errors
|
|
from plaso.lib import event
|
|
from plaso.lib import limit
|
|
from plaso.lib import pfilter
|
|
from plaso.lib import output
|
|
from plaso.lib import timelib
|
|
from plaso.lib import utils
|
|
from plaso.proto import plaso_storage_pb2
|
|
from plaso.serializer import json_serializer
|
|
from plaso.serializer import protobuf_serializer
|
|
|
|
|
|
class _EventTagIndexValue(object):
|
|
"""Class that defines the event tag index value."""
|
|
|
|
TAG_STORE_STRUCT = construct.Struct(
|
|
'tag_store',
|
|
construct.ULInt32('store_number'),
|
|
construct.ULInt32('store_index'))
|
|
|
|
TAG_UUID_STRUCT = construct.Struct(
|
|
'tag_uuid',
|
|
construct.PascalString('event_uuid'))
|
|
|
|
TAG_INDEX_STRUCT = construct.Struct(
|
|
'tag_index',
|
|
construct.Byte('type'),
|
|
construct.ULInt32('offset'),
|
|
construct.IfThenElse(
|
|
'tag',
|
|
lambda ctx: ctx['type'] == 1,
|
|
TAG_STORE_STRUCT,
|
|
TAG_UUID_STRUCT))
|
|
|
|
TAG_TYPE_UNDEFINED = 0
|
|
TAG_TYPE_NUMERIC = 1
|
|
TAG_TYPE_UUID = 2
|
|
|
|
def __init__(self, identifier, store_number=0, store_offset=0):
|
|
"""Initializes the tag index value.
|
|
|
|
Args:
|
|
identifier: the identifier string.
|
|
store_number: optional store number. The default is 0.
|
|
store_offset: optional offset relative to the start of the store.
|
|
The default 0.
|
|
"""
|
|
super(_EventTagIndexValue, self).__init__()
|
|
self.identifier = identifier
|
|
self.store_number = store_number
|
|
self.store_offset = store_offset
|
|
|
|
@classmethod
|
|
def Read(cls, file_object, store_number):
|
|
"""Reads a tag index value from the file-like object.
|
|
|
|
Args:
|
|
file_object: the file-like object to read from.
|
|
store_number: the store number.
|
|
|
|
Returns:
|
|
The tag index value if successful or None.
|
|
"""
|
|
try:
|
|
tag_index_struct = cls.TAG_INDEX_STRUCT.parse_stream(file_object)
|
|
except (construct.FieldError, AttributeError):
|
|
return None
|
|
|
|
tag_type = tag_index_struct.get('type', cls.TAG_TYPE_UNDEFINED)
|
|
if tag_type not in [cls.TAG_TYPE_NUMERIC, cls.TAG_TYPE_UUID]:
|
|
logging.warning('Unsupported tag type: {0:d}'.format(tag_type))
|
|
return None
|
|
|
|
tag_entry = tag_index_struct.get('tag', {})
|
|
if tag_type == cls.TAG_TYPE_NUMERIC:
|
|
tag_identifier = '{}:{}'.format(
|
|
tag_entry.get('store_number', 0),
|
|
tag_entry.get('store_index', 0))
|
|
|
|
else:
|
|
tag_identifier = tag_entry.get('event_uuid', '0')
|
|
|
|
store_offset = tag_index_struct.get('offset')
|
|
return _EventTagIndexValue(
|
|
tag_identifier, store_number=store_number, store_offset=store_offset)
|
|
|
|
|
|
class StorageFile(object):
|
|
"""Class that defines the storage file."""
|
|
|
|
_STREAM_DATA_SEGMENT_SIZE = 1024
|
|
|
|
# Set the maximum buffer size to 196 MiB
|
|
MAX_BUFFER_SIZE = 196 * 1024 * 1024
|
|
|
|
# Set the maximum protobuf string size to 40 MiB
|
|
MAX_PROTO_STRING_SIZE = 40 * 1024 * 1024
|
|
|
|
# Set the maximum report protobuf string size to 24 MiB
|
|
MAX_REPORT_PROTOBUF_SIZE = 24 * 1024 * 1024
|
|
|
|
# Set the version of this storage mechanism.
|
|
STORAGE_VERSION = 1
|
|
|
|
# Define structs.
|
|
INTEGER = construct.ULInt32('integer')
|
|
|
|
source_short_map = {}
|
|
for value in plaso_storage_pb2.EventObject.DESCRIPTOR.enum_types_by_name[
|
|
'SourceShort'].values:
|
|
source_short_map[value.name] = value.number
|
|
|
|
def __init__(
|
|
self, output_file, buffer_size=0, read_only=False, pre_obj=None,
|
|
serializer_format='proto'):
|
|
"""Initializes the storage file.
|
|
|
|
Args:
|
|
output_file: The name of the output file.
|
|
buffer_size: Optional maximum size of a single storage (protobuf) file.
|
|
The default is 0, which indicates no limit.
|
|
read_only: Optional boolean to indicate we are opening the storage file
|
|
for reading only. The default is false.
|
|
pre_obj: Optional preprocessing object that gets stored inside
|
|
the storage file. The default is None.
|
|
serializer_format: A string containing either "proto" or "json". The
|
|
default is proto.
|
|
|
|
Raises:
|
|
IOError: if we open up the file in read only mode and the file does
|
|
not exist.
|
|
"""
|
|
# bound_first and bound_last contain timestamps and None is used
|
|
# to indicate not set.
|
|
self._bound_first = None
|
|
self._bound_last = None
|
|
self._buffer = []
|
|
self._buffer_first_timestamp = sys.maxint
|
|
self._buffer_last_timestamp = 0
|
|
self._buffer_size = 0
|
|
self._event_object_serializer = None
|
|
self._event_serializer_format_string = u''
|
|
self._event_tag_index = None
|
|
self._file_open = False
|
|
self._file_number = 1
|
|
self._first_file_number = None
|
|
self._max_buffer_size = buffer_size or self.MAX_BUFFER_SIZE
|
|
self._output_file = output_file
|
|
self._pre_obj = pre_obj
|
|
self._proto_streams = {}
|
|
self._read_only = None
|
|
self._write_counter = 0
|
|
|
|
self._analysis_report_serializer = (
|
|
protobuf_serializer.ProtobufAnalysisReportSerializer)
|
|
self._event_tag_serializer = (
|
|
protobuf_serializer.ProtobufEventTagSerializer)
|
|
self._pre_obj_serializer = (
|
|
protobuf_serializer.ProtobufPreprocessObjectSerializer)
|
|
self._SetEventObjectSerializer(serializer_format)
|
|
|
|
self._Open(read_only)
|
|
|
|
# Add information about the serializer used in the storage.
|
|
if not read_only and not self._OpenStream('serializer.txt'):
|
|
self._WriteStream('serializer.txt', self._event_serializer_format_string)
|
|
|
|
def GetLastPreprocessObject(self):
|
|
"""Return the last pre-processing object from the storage file if possible.
|
|
|
|
Returns:
|
|
The last stored pre-processing object (instance of
|
|
event.PreprocessObject). If not found nor stored it return None.
|
|
"""
|
|
if self._pre_obj:
|
|
return self._pre_obj
|
|
|
|
list_of_pre_objs = self.GetStorageInformation()
|
|
if not list_of_pre_objs:
|
|
return
|
|
|
|
# We want the last saved pre-processing object.
|
|
pre_obj = list_of_pre_objs[-1]
|
|
if pre_obj:
|
|
return pre_obj
|
|
|
|
def _Open(self, read_only=False):
|
|
"""Opens the storage file.
|
|
|
|
Args:
|
|
read_only: Optional boolean to indicate we are opening the storage file
|
|
for reading only. The default is false.
|
|
|
|
Raises:
|
|
IOError: if we open up the file in read only mode and the file does
|
|
not exist.
|
|
"""
|
|
if read_only:
|
|
access_mode = 'r'
|
|
else:
|
|
access_mode = 'a'
|
|
|
|
try:
|
|
self._zipfile = zipfile.ZipFile(
|
|
self._output_file, access_mode, zipfile.ZIP_DEFLATED)
|
|
except zipfile.BadZipfile as exception:
|
|
raise IOError(u'Unable to read ZIP file with error: {0:s}'.format(
|
|
exception))
|
|
|
|
self._file_open = True
|
|
self._read_only = read_only
|
|
|
|
# Read the serializer string (if available).
|
|
serializer = self._ReadStream('serializer.txt')
|
|
if serializer:
|
|
self._SetEventObjectSerializer(serializer)
|
|
|
|
if not self._read_only:
|
|
logging.debug(u'Writing to ZIP file with buffer size: {0:d}'.format(
|
|
self._max_buffer_size))
|
|
|
|
if self._pre_obj:
|
|
self._pre_obj.counter = collections.Counter()
|
|
self._pre_obj.plugin_counter = collections.Counter()
|
|
if hasattr(self._pre_obj, 'collection_information'):
|
|
cmd_line = ' '.join(sys.argv)
|
|
encoding = getattr(self._pre_obj, 'preferred_encoding', None)
|
|
if encoding:
|
|
try:
|
|
cmd_line = cmd_line.decode(encoding)
|
|
except UnicodeDecodeError:
|
|
pass
|
|
self._pre_obj.collection_information['cmd_line'] = cmd_line
|
|
|
|
# Start up a counter for modules in buffer.
|
|
self._count_data_type = collections.Counter()
|
|
self._count_parser = collections.Counter()
|
|
|
|
# Need to get the last number in the list.
|
|
for stream_name in self._GetStreamNames():
|
|
if stream_name.startswith('plaso_meta.'):
|
|
_, _, file_number = stream_name.partition('.')
|
|
|
|
try:
|
|
file_number = int(file_number, 10)
|
|
if file_number >= self._file_number:
|
|
self._file_number = file_number + 1
|
|
except ValueError:
|
|
# Ignore invalid metadata stream names.
|
|
pass
|
|
|
|
self._first_file_number = self._file_number
|
|
|
|
def __enter__(self):
|
|
"""Make usable with "with" statement."""
|
|
return self
|
|
|
|
def __exit__(self, unused_type, unused_value, unused_traceback):
|
|
"""Make usable with "with" statement."""
|
|
self.Close()
|
|
|
|
def _BuildTagIndex(self):
|
|
"""Builds the tag index that contains the offsets for each tag.
|
|
|
|
Raises:
|
|
IOError: if the stream cannot be opened.
|
|
"""
|
|
self._event_tag_index = {}
|
|
|
|
for stream_name in self._GetStreamNames():
|
|
if not stream_name.startswith('plaso_tag_index.'):
|
|
continue
|
|
|
|
file_object = self._OpenStream(stream_name, 'r')
|
|
if file_object is None:
|
|
raise IOError(u'Unable to open stream: {0:s}'.format(stream_name))
|
|
|
|
_, _, store_number = stream_name.rpartition('.')
|
|
# TODO: catch exception.
|
|
store_number = int(store_number, 10)
|
|
|
|
while True:
|
|
tag_index_value = _EventTagIndexValue.Read(
|
|
file_object, store_number)
|
|
if tag_index_value is None:
|
|
break
|
|
|
|
self._event_tag_index[tag_index_value.identifier] = tag_index_value
|
|
|
|
def _FlushBuffer(self):
|
|
"""Flushes the buffered streams to disk."""
|
|
if not self._buffer_size:
|
|
return
|
|
|
|
yaml_dict = {
|
|
'range': (self._buffer_first_timestamp, self._buffer_last_timestamp),
|
|
'version': self.STORAGE_VERSION,
|
|
'data_type': list(self._count_data_type.viewkeys()),
|
|
'parsers': list(self._count_parser.viewkeys()),
|
|
'count': len(self._buffer),
|
|
'type_count': self._count_data_type.most_common()}
|
|
self._count_data_type = collections.Counter()
|
|
self._count_parser = collections.Counter()
|
|
|
|
stream_name = 'plaso_meta.{0:06d}'.format(self._file_number)
|
|
self._WriteStream(stream_name, yaml.safe_dump(yaml_dict))
|
|
|
|
ofs = 0
|
|
proto_str = []
|
|
index_str = []
|
|
timestamp_str = []
|
|
for _ in range(len(self._buffer)):
|
|
timestamp, entry = heapq.heappop(self._buffer)
|
|
# TODO: Instead of appending to an array
|
|
# which is not optimal (loads up the entire max file
|
|
# size into memory) Zipfile should be extended to
|
|
# allow appending to files (implement lock).
|
|
try:
|
|
# Appending a timestamp to the timestamp index, this is used during
|
|
# time based filtering. If this is not done we would need to unserialize
|
|
# all events to get the timestamp value which is really slow.
|
|
timestamp_str.append(struct.pack('<q', timestamp))
|
|
except struct.error as exception:
|
|
# TODO: Instead of just logging the error unserialize the event
|
|
# and print out information from the event, eg. parser and path spec
|
|
# location. That way we can find the root cause and fix that instead of
|
|
# just catching the exception.
|
|
logging.error((
|
|
u'Unable to store event, not able to index timestamp value with '
|
|
u'error: {0:s} [timestamp: {1:d}]').format(exception, timestamp))
|
|
continue
|
|
index_str.append(struct.pack('<I', ofs))
|
|
packed = struct.pack('<I', len(entry)) + entry
|
|
ofs += len(packed)
|
|
proto_str.append(packed)
|
|
|
|
stream_name = 'plaso_index.{0:06d}'.format(self._file_number)
|
|
self._WriteStream(stream_name, ''.join(index_str))
|
|
|
|
stream_name = 'plaso_proto.{0:06d}'.format(self._file_number)
|
|
self._WriteStream(stream_name, ''.join(proto_str))
|
|
|
|
stream_name = 'plaso_timestamps.{0:06d}'.format(self._file_number)
|
|
self._WriteStream(stream_name, ''.join(timestamp_str))
|
|
|
|
self._file_number += 1
|
|
self._buffer_size = 0
|
|
self._buffer = []
|
|
self._buffer_first_timestamp = sys.maxint
|
|
self._buffer_last_timestamp = 0
|
|
|
|
def _GetEventTagIndexValue(self, store_number, store_index, uuid):
|
|
"""Retrieves an event tag index value.
|
|
|
|
Args:
|
|
store_number: the store number.
|
|
store_index: the store index.
|
|
uuid: the UUID string.
|
|
|
|
Returns:
|
|
An event tag index value (instance of _EventTagIndexValue).
|
|
"""
|
|
if self._event_tag_index is None:
|
|
self._BuildTagIndex()
|
|
|
|
# Try looking up event tag by numeric identifier.
|
|
tag_identifier = '{0:d}:{1:d}'.format(store_number, store_index)
|
|
tag_index_value = self._event_tag_index.get(tag_identifier, None)
|
|
|
|
# Try looking up event tag by UUID.
|
|
if tag_index_value is None:
|
|
tag_index_value = self._event_tag_index.get(uuid, None)
|
|
|
|
return tag_index_value
|
|
|
|
def _GetStreamNames(self):
|
|
"""Retrieves a generator of the storage stream names."""
|
|
if self._zipfile:
|
|
for stream_name in self._zipfile.namelist():
|
|
yield stream_name
|
|
|
|
def _GetEventObjectProtobufString(self, stream_number, entry_index=-1):
|
|
"""Returns a specific event object protobuf string.
|
|
|
|
By default the next entry in the appropriate proto file is read
|
|
and returned, however any entry can be read using the index file.
|
|
|
|
Args:
|
|
stream_number: The proto stream number.
|
|
entry_index: Read a specific entry in the file. The default is -1,
|
|
which represents the next available entry.
|
|
|
|
Returns:
|
|
A tuple containing the event object protobuf string and the entry index
|
|
of the event object protobuf string within the storage file.
|
|
|
|
Raises:
|
|
EOFError: When we reach the end of the protobuf file.
|
|
errors.WrongProtobufEntry: If the probotuf size is too large for storage.
|
|
IOError: if the stream cannot be opened.
|
|
"""
|
|
file_object, last_entry_index = self._GetProtoStream(stream_number)
|
|
|
|
if entry_index >= 0:
|
|
stream_offset = self._GetProtoStreamOffset(stream_number, entry_index)
|
|
if stream_offset is None:
|
|
logging.error((
|
|
u'Unable to read entry index: {0:d} from proto stream: '
|
|
u'{1:d}').format(entry_index, stream_number))
|
|
|
|
return None, None
|
|
|
|
file_object, last_entry_index = self._GetProtoStreamSeekOffset(
|
|
stream_number, entry_index, stream_offset)
|
|
|
|
if (not last_entry_index and entry_index == -1 and
|
|
self._bound_first is not None):
|
|
# We only get here if the following conditions are met:
|
|
# 1. last_entry_index is not set (so this is the first read
|
|
# from this file).
|
|
# 2. There is a lower bound (so we have a date filter).
|
|
# 3. The lower bound is higher than zero (basically set to a value).
|
|
# 4. We are accessing this function using 'get me the next entry' as an
|
|
# opposed to the 'get me entry X', where we just want to server entry
|
|
# X.
|
|
#
|
|
# The purpose: speed seeking into the storage file based on time. Instead
|
|
# of spending precious time reading through the storage file and
|
|
# deserializing protobufs just to compare timestamps we read a much
|
|
# 'cheaper' file, one that only contains timestamps to find the proper
|
|
# entry into the storage file. That way we'll get to the right place in
|
|
# the file and can start reading protobufs from the right location.
|
|
|
|
stream_name = 'plaso_timestamps.{0:06d}'.format(stream_number)
|
|
|
|
if stream_name in self._GetStreamNames():
|
|
timestamp_file_object = self._OpenStream(stream_name, 'r')
|
|
if timestamp_file_object is None:
|
|
raise IOError(u'Unable to open stream: {0:s}'.format(stream_name))
|
|
|
|
index = 0
|
|
timestamp_compare = 0
|
|
encountered_error = False
|
|
while timestamp_compare < self._bound_first:
|
|
timestamp_raw = timestamp_file_object.read(8)
|
|
if len(timestamp_raw) != 8:
|
|
encountered_error = True
|
|
break
|
|
|
|
timestamp_compare = struct.unpack('<q', timestamp_raw)[0]
|
|
index += 1
|
|
|
|
if encountered_error:
|
|
return None, None
|
|
|
|
return self._GetEventObjectProtobufString(
|
|
stream_number, entry_index=index)
|
|
|
|
size_data = file_object.read(4)
|
|
|
|
if len(size_data) != 4:
|
|
return None, None
|
|
|
|
proto_string_size = struct.unpack('<I', size_data)[0]
|
|
|
|
if proto_string_size > self.MAX_PROTO_STRING_SIZE:
|
|
raise errors.WrongProtobufEntry(
|
|
u'Protobuf string size value exceeds maximum: {0:d}'.format(
|
|
proto_string_size))
|
|
|
|
event_object_data = file_object.read(proto_string_size)
|
|
self._proto_streams[stream_number] = (file_object, last_entry_index + 1)
|
|
|
|
return event_object_data, last_entry_index
|
|
|
|
def _GetEventGroupProto(self, file_object):
|
|
"""Return a single group entry."""
|
|
unpacked = file_object.read(4)
|
|
if len(unpacked) != 4:
|
|
return None
|
|
|
|
size = struct.unpack('<I', unpacked)[0]
|
|
|
|
if size > StorageFile.MAX_PROTO_STRING_SIZE:
|
|
raise errors.WrongProtobufEntry(
|
|
u'Protobuf size too large: {0:d}'.format(size))
|
|
|
|
proto_serialized = file_object.read(size)
|
|
proto = plaso_storage_pb2.EventGroup()
|
|
|
|
proto.ParseFromString(proto_serialized)
|
|
return proto
|
|
|
|
def _GetProtoStream(self, stream_number):
|
|
"""Retrieves the proto stream.
|
|
|
|
Args:
|
|
stream_number: the number of the stream.
|
|
|
|
Returns:
|
|
A tuple of the stream file-like object and the last entry index to
|
|
which the offset of the stream file-like object points.
|
|
|
|
Raises:
|
|
IOError: if the stream cannot be opened.
|
|
"""
|
|
if stream_number not in self._proto_streams:
|
|
stream_name = 'plaso_proto.{0:06d}'.format(stream_number)
|
|
|
|
file_object = self._OpenStream(stream_name, 'r')
|
|
if file_object is None:
|
|
raise IOError(u'Unable to open stream: {0:s}'.format(stream_name))
|
|
|
|
# TODO: change this to a value object and track the stream offset as well.
|
|
# This allows to reduce the number of re-opens when the seek offset is
|
|
# beyond the current offset.
|
|
self._proto_streams[stream_number] = (file_object, 0)
|
|
|
|
return self._proto_streams[stream_number]
|
|
|
|
def _GetProtoStreamSeekOffset(
|
|
self, stream_number, entry_index, stream_offset):
|
|
"""Retrieves the proto stream and seeks a specified offset in the stream.
|
|
|
|
Args:
|
|
stream_number: the number of the stream.
|
|
entry_index: the entry index.
|
|
stream_offset: the offset relative to the start of the stream.
|
|
|
|
Returns:
|
|
A tuple of the stream file-like object and the last index.
|
|
|
|
Raises:
|
|
IOError: if the stream cannot be opened.
|
|
"""
|
|
# Since zipfile.ZipExtFile is not seekable we need to close the stream
|
|
# and reopen it to fake a seek.
|
|
if stream_number in self._proto_streams:
|
|
previous_file_object, _ = self._proto_streams[stream_number]
|
|
del self._proto_streams[stream_number]
|
|
previous_file_object.close()
|
|
|
|
stream_name = 'plaso_proto.{0:06d}'.format(stream_number)
|
|
file_object = self._OpenStream(stream_name, 'r')
|
|
if file_object is None:
|
|
raise IOError(u'Unable to open stream: {0:s}'.format(stream_name))
|
|
|
|
# Since zipfile.ZipExtFile is not seekable we need to read upto
|
|
# the stream offset.
|
|
_ = file_object.read(stream_offset)
|
|
|
|
self._proto_streams[stream_number] = (file_object, entry_index)
|
|
|
|
return self._proto_streams[stream_number]
|
|
|
|
def _GetProtoStreamOffset(self, stream_number, entry_index):
|
|
"""Retrieves the offset of a proto stream entry from the index stream.
|
|
|
|
Args:
|
|
stream_number: the number of the stream.
|
|
entry_index: the entry index.
|
|
|
|
Returns:
|
|
The offset of the entry in the corresponding proto stream
|
|
or None on error.
|
|
|
|
Raises:
|
|
IOError: if the stream cannot be opened.
|
|
"""
|
|
# TODO: cache the index file object in the same way as the proto
|
|
# stream file objects.
|
|
|
|
# TODO: once cached use the last entry index to determine if the stream
|
|
# file object should be re-opened.
|
|
|
|
stream_name = 'plaso_index.{0:06d}'.format(stream_number)
|
|
index_file_object = self._OpenStream(stream_name, 'r')
|
|
if index_file_object is None:
|
|
raise IOError(u'Unable to open stream: {0:s}'.format(stream_name))
|
|
|
|
# Since zipfile.ZipExtFile is not seekable we need to read upto
|
|
# the stream offset.
|
|
_ = index_file_object.read(entry_index * 4)
|
|
|
|
index_data = index_file_object.read(4)
|
|
|
|
index_file_object.close()
|
|
|
|
if len(index_data) != 4:
|
|
return None
|
|
|
|
return struct.unpack('<I', index_data)[0]
|
|
|
|
def _OpenStream(self, stream_name, mode='r'):
|
|
"""Opens a stream.
|
|
|
|
Args:
|
|
stream_name: the name of the stream.
|
|
mode: the access mode. The default is read-only ('r').
|
|
|
|
Returns:
|
|
The stream file-like object (instance of zipfile.ZipExtFile) or None.
|
|
"""
|
|
try:
|
|
return self._zipfile.open(stream_name, mode)
|
|
except KeyError:
|
|
return
|
|
|
|
def _ReadEventTag(self, file_object):
|
|
"""Reads an event tag from the storage file.
|
|
|
|
Returns:
|
|
An event tag (instance of EventTag).
|
|
"""
|
|
event_tag_data = file_object.read(4)
|
|
if len(event_tag_data) != 4:
|
|
return None
|
|
|
|
proto_string_size = self.INTEGER.parse(event_tag_data)
|
|
|
|
if proto_string_size > self.MAX_PROTO_STRING_SIZE:
|
|
raise errors.WrongProtobufEntry(
|
|
u'Protobuf string size value exceeds maximum: {0:d}'.format(
|
|
proto_string_size))
|
|
|
|
proto_string = file_object.read(proto_string_size)
|
|
return self._event_tag_serializer.ReadSerialized(proto_string)
|
|
|
|
def _ReadEventTagByIdentifier(self, store_number, store_index, uuid):
|
|
"""Reads an event tag by identifier.
|
|
|
|
Args:
|
|
store_number: the store number.
|
|
store_index: the store index.
|
|
uuid: the UUID string.
|
|
|
|
Returns:
|
|
The event tag (instance of EventTag).
|
|
|
|
Raises:
|
|
IOError: if the stream cannot be opened.
|
|
"""
|
|
tag_index_value = self._GetEventTagIndexValue(
|
|
store_number, store_index, uuid)
|
|
if tag_index_value is None:
|
|
return
|
|
|
|
stream_name = 'plaso_tagging.{0:06d}'.format(tag_index_value.store_number)
|
|
tag_file_object = self._OpenStream(stream_name, 'r')
|
|
if tag_file_object is None:
|
|
raise IOError(u'Unable to open stream: {0:s}'.format(stream_name))
|
|
|
|
# Since zipfile.ZipExtFile is not seekable we need to read upto
|
|
# the store offset.
|
|
_ = tag_file_object.read(tag_index_value.store_offset)
|
|
return self._ReadEventTag(tag_file_object)
|
|
|
|
def _ReadStream(self, stream_name):
|
|
"""Reads the data in a stream.
|
|
|
|
Args:
|
|
stream_name: the name of the stream.
|
|
|
|
Returns:
|
|
A byte string containing the data of the stream.
|
|
"""
|
|
data_segments = []
|
|
file_object = self._OpenStream(stream_name, 'r')
|
|
|
|
# zipfile.ZipExtFile does not support the with-statement interface.
|
|
if file_object:
|
|
data = file_object.read(self._STREAM_DATA_SEGMENT_SIZE)
|
|
while data:
|
|
data_segments.append(data)
|
|
data = file_object.read(self._STREAM_DATA_SEGMENT_SIZE)
|
|
|
|
file_object.close()
|
|
|
|
return ''.join(data_segments)
|
|
|
|
def _SetEventObjectSerializer(self, serializer_string):
|
|
"""Set the serializer for the event object."""
|
|
if serializer_string == 'json':
|
|
self._event_object_serializer = (
|
|
json_serializer.JsonEventObjectSerializer)
|
|
self._event_serializer_format_string = 'json'
|
|
else:
|
|
self._event_object_serializer = (
|
|
protobuf_serializer.ProtobufEventObjectSerializer)
|
|
self._event_serializer_format_string = 'proto'
|
|
|
|
def _WritePreprocessObject(self, pre_obj):
|
|
"""Writes a preprocess object to the storage file.
|
|
|
|
Args:
|
|
pre_obj: the preprocess object (instance of PreprocessObject).
|
|
|
|
Raises:
|
|
IOError: if the stream cannot be opened.
|
|
"""
|
|
existing_stream_data = self._ReadStream('information.dump')
|
|
|
|
# Store information about store range for this particular
|
|
# preprocessing object. This will determine which stores
|
|
# this information is applicaple for.
|
|
stores = list(self.GetProtoNumbers())
|
|
if stores:
|
|
end = stores[-1] + 1
|
|
else:
|
|
end = self._first_file_number
|
|
pre_obj.store_range = (self._first_file_number, end)
|
|
|
|
pre_obj_data = self._pre_obj_serializer.WriteSerialized(pre_obj)
|
|
|
|
stream_data = ''.join([
|
|
existing_stream_data,
|
|
struct.pack('<I', len(pre_obj_data)), pre_obj_data])
|
|
|
|
self._WriteStream('information.dump', stream_data)
|
|
|
|
def _WriteStream(self, stream_name, stream_data):
|
|
"""Write the data to a stream.
|
|
|
|
Args:
|
|
stream_name: the name of the stream.
|
|
stream_data: the data of the steam.
|
|
"""
|
|
self._zipfile.writestr(stream_name, stream_data)
|
|
|
|
def Close(self):
|
|
"""Closes the storage, flush the last buffer and closes the ZIP file."""
|
|
if self._file_open:
|
|
if not self._read_only and self._pre_obj:
|
|
self._WritePreprocessObject(self._pre_obj)
|
|
|
|
self._FlushBuffer()
|
|
self._zipfile.close()
|
|
self._file_open = False
|
|
if not self._read_only:
|
|
logging.info((
|
|
u'[Storage] Closing the storage, number of events processed: '
|
|
u'{0:d}').format(self._write_counter))
|
|
|
|
def GetGrouping(self):
|
|
"""Return a generator that reads all grouping information from storage.
|
|
|
|
Raises:
|
|
IOError: if the stream cannot be opened.
|
|
"""
|
|
if not self.HasGrouping():
|
|
return
|
|
|
|
for stream_name in self._GetStreamNames():
|
|
if stream_name.startswith('plaso_grouping.'):
|
|
file_object = self._OpenStream(stream_name, 'r')
|
|
if file_object is None:
|
|
raise IOError(u'Unable to open stream: {0:s}'.format(stream_name))
|
|
|
|
group_entry = self._GetEventGroupProto(file_object)
|
|
while group_entry:
|
|
yield group_entry
|
|
group_entry = self._GetEventGroupProto(file_object)
|
|
|
|
def GetNumberOfEvents(self):
|
|
"""Retrieves the number of event objects in a storage file."""
|
|
total_events = 0
|
|
if hasattr(self, 'GetStorageInformation'):
|
|
for store_info in self.GetStorageInformation():
|
|
if hasattr(store_info, 'stores'):
|
|
stores = store_info.stores.values()
|
|
for store_file in stores:
|
|
if type(store_file) is dict and 'count' in store_file:
|
|
total_events += store_file['count']
|
|
|
|
return total_events
|
|
|
|
def GetEventsFromGroup(self, group_proto):
|
|
"""Return a generator with all EventObjects from a group."""
|
|
for group_event in group_proto.events:
|
|
yield self.GetEventObject(
|
|
group_event.store_number, entry_index=group_event.store_index)
|
|
|
|
def GetTagging(self):
|
|
"""Return a generator that reads all tagging information from storage.
|
|
|
|
This function reads all tagging files inside the storage and returns
|
|
back the EventTagging protobuf, and only that protobuf.
|
|
|
|
To get the full EventObject with tags attached it is possible to use
|
|
the GetTaggedEvent and pass the EventTagging protobuf to it.
|
|
|
|
Yields:
|
|
All EventTag objects stored inside the storage container.
|
|
|
|
Raises:
|
|
IOError: if the stream cannot be opened.
|
|
"""
|
|
for stream_name in self._GetStreamNames():
|
|
if stream_name.startswith('plaso_tagging.'):
|
|
file_object = self._OpenStream(stream_name, 'r')
|
|
if file_object is None:
|
|
raise IOError(u'Unable to open stream: {0:s}'.format(stream_name))
|
|
|
|
tag_entry = self._ReadEventTag(file_object)
|
|
while tag_entry:
|
|
yield tag_entry
|
|
tag_entry = self._ReadEventTag(file_object)
|
|
|
|
def GetTaggedEvent(self, tag_event):
|
|
"""Read in an EventTag object from a tag and return an EventObject.
|
|
|
|
This function uses the information inside the EventTag object
|
|
to open the EventObject that was tagged and returns it, with the
|
|
tag information attached to it.
|
|
|
|
Args:
|
|
tag_event: An EventTag object.
|
|
|
|
Returns:
|
|
An EventObject with the EventTag object attached.
|
|
"""
|
|
evt = self.GetEventObject(
|
|
tag_event.store_number, entry_index=tag_event.store_index)
|
|
if not evt:
|
|
return None
|
|
|
|
evt.tag = tag_event
|
|
|
|
return evt
|
|
|
|
def GetStorageInformation(self):
|
|
"""Retrieves storage (preprocessing) information stored in the storage file.
|
|
|
|
Returns:
|
|
A list of preprocessing objects (instances of PreprocessingObject)
|
|
that contain the storage information.
|
|
"""
|
|
information = []
|
|
|
|
file_object = self._OpenStream('information.dump', 'r')
|
|
if file_object is None:
|
|
return information
|
|
|
|
while True:
|
|
unpacked = file_object.read(4)
|
|
if len(unpacked) != 4:
|
|
break
|
|
|
|
size = struct.unpack('<I', unpacked)[0]
|
|
|
|
if size > self.MAX_PROTO_STRING_SIZE:
|
|
raise errors.WrongProtobufEntry(
|
|
u'Protobuf size too large: {0:d}'.format(size))
|
|
|
|
serialized_pre_obj = file_object.read(size)
|
|
try:
|
|
info = self._pre_obj_serializer.ReadSerialized(serialized_pre_obj)
|
|
except message.DecodeError:
|
|
logging.error(u'Unable to parse preprocessing object, bailing out.')
|
|
break
|
|
|
|
information.append(info)
|
|
|
|
stores = list(self.GetProtoNumbers())
|
|
information[-1].stores = {}
|
|
information[-1].stores['Number'] = len(stores)
|
|
for store_number in stores:
|
|
store_identifier = 'Store {0:d}'.format(store_number)
|
|
information[-1].stores[store_identifier] = self.ReadMeta(store_number)
|
|
|
|
return information
|
|
|
|
def SetStoreLimit(self, unused_my_filter=None):
|
|
"""Set a limit to the stores used for returning data."""
|
|
# Retrieve set first and last timestamps.
|
|
self._bound_first, self._bound_last = pfilter.TimeRangeCache.GetTimeRange()
|
|
|
|
self.store_range = []
|
|
|
|
# TODO: Fetch a filter object from the filter query.
|
|
|
|
for number in self.GetProtoNumbers():
|
|
# TODO: Read more criteria from here.
|
|
first, last = self.ReadMeta(number).get('range', (0, limit.MAX_INT64))
|
|
if last < first:
|
|
logging.error(
|
|
u'last: {0:d} first: {1:d} container: {2:d} (last < first)'.format(
|
|
last, first, number))
|
|
|
|
if first <= self._bound_last and self._bound_first <= last:
|
|
# TODO: Check at least parser and data_type (stored in metadata).
|
|
# Check whether these attributes exist in filter, if so use the filter
|
|
# to determine whether the stores should be included.
|
|
self.store_range.append(number)
|
|
|
|
else:
|
|
logging.debug(u'Store [{0:d}] not used'.format(number))
|
|
|
|
def GetSortedEntry(self):
|
|
"""Return a sorted entry from the storage file.
|
|
|
|
Returns:
|
|
An event object (instance of EventObject).
|
|
"""
|
|
if self._bound_first is None:
|
|
self._bound_first, self._bound_last = (
|
|
pfilter.TimeRangeCache.GetTimeRange())
|
|
|
|
if not hasattr(self, '_merge_buffer'):
|
|
self._merge_buffer = []
|
|
number_range = getattr(self, 'store_range', list(self.GetProtoNumbers()))
|
|
for store_number in number_range:
|
|
event_object = self.GetEventObject(store_number)
|
|
if not event_object:
|
|
return
|
|
|
|
while event_object.timestamp < self._bound_first:
|
|
event_object = self.GetEventObject(store_number)
|
|
if not event_object:
|
|
return
|
|
|
|
heapq.heappush(
|
|
self._merge_buffer,
|
|
(event_object.timestamp, store_number, event_object))
|
|
|
|
if not self._merge_buffer:
|
|
return
|
|
|
|
_, store_number, event_read = heapq.heappop(self._merge_buffer)
|
|
if not event_read:
|
|
return
|
|
|
|
# Stop as soon as we hit the upper bound.
|
|
if event_read.timestamp > self._bound_last:
|
|
return
|
|
|
|
new_event_object = self.GetEventObject(store_number)
|
|
|
|
if new_event_object:
|
|
heapq.heappush(
|
|
self._merge_buffer,
|
|
(new_event_object.timestamp, store_number, new_event_object))
|
|
|
|
event_read.tag = self._ReadEventTagByIdentifier(
|
|
event_read.store_number, event_read.store_index, event_read.uuid)
|
|
|
|
return event_read
|
|
|
|
def GetEventObject(self, stream_number, entry_index=-1):
|
|
"""Reads an event object from the store.
|
|
|
|
By default the next entry in the appropriate proto file is read
|
|
and returned, however any entry can be read using the index file.
|
|
|
|
Args:
|
|
stream_number: The proto stream number.
|
|
entry_index: Read a specific entry in the file. The default is -1,
|
|
which represents the next available entry.
|
|
|
|
Returns:
|
|
An event object (instance of EventObject) entry read from the file or
|
|
None if not able to read in a new event.
|
|
"""
|
|
event_object_data, entry_index = self._GetEventObjectProtobufString(
|
|
stream_number, entry_index=entry_index)
|
|
if not event_object_data:
|
|
return
|
|
|
|
event_object = self._event_object_serializer.ReadSerialized(
|
|
event_object_data)
|
|
event_object.store_number = stream_number
|
|
event_object.store_index = entry_index
|
|
|
|
return event_object
|
|
|
|
def GetEntries(self, number):
|
|
"""A generator to read all plaso_storage protobufs.
|
|
|
|
The storage mechanism of Plaso works in the way that it creates potentially
|
|
several files inside the ZIP container. As soon as the number of protobufs
|
|
stored exceed the size of buffer_size they will be flushed to disk as:
|
|
|
|
plaso_proto.XXX
|
|
|
|
Where XXX is an increasing integer, starting from one. To get all the files
|
|
or the numbers that are available this class implements a method called
|
|
GetProtoNumbers() that returns a list of all available protobuf files within
|
|
the container.
|
|
|
|
This method returns a generator that returns all plaso_storage protobufs in
|
|
the named container, as indicated by the number argument. So if this method
|
|
is called as storage_object.GetEntries(1) the generator will return the
|
|
entries found in the file plaso_proto.000001.
|
|
|
|
Args:
|
|
number: The protofile number.
|
|
|
|
Yields:
|
|
A protobuf object from the protobuf file.
|
|
"""
|
|
# TODO: Change this function, don't accept a store number and implement the
|
|
# MergeSort functionality of the psort file in here. This will then always
|
|
# return the sorted entries from the storage file, implementing the second
|
|
# stage of the sort/merge algorithm.
|
|
while True:
|
|
try:
|
|
proto = self.GetEventObject(number)
|
|
if not proto:
|
|
logging.debug(
|
|
u'End of protobuf file plaso_proto.{0:06d} reached.'.format(
|
|
number))
|
|
break
|
|
yield proto
|
|
except errors.WrongProtobufEntry as exception:
|
|
logging.warning((
|
|
u'Problem while parsing a protobuf entry from: '
|
|
u'plaso_proto.{0:06d} with error: {1:s}').format(number, exception))
|
|
|
|
def GetProtoNumbers(self):
|
|
"""Return all available protobuf numbers."""
|
|
numbers = []
|
|
for name in self._GetStreamNames():
|
|
if 'plaso_proto' in name:
|
|
_, num = name.split('.')
|
|
numbers.append(int(num))
|
|
|
|
for number in sorted(numbers):
|
|
yield number
|
|
|
|
def ReadMeta(self, number):
|
|
"""Return a dict with the metadata entries.
|
|
|
|
Args:
|
|
number: The number of the metadata file (name is plaso_meta_XXX where
|
|
XXX is this number.
|
|
|
|
Returns:
|
|
A dict object containing all the variables inside the metadata file.
|
|
|
|
Raises:
|
|
IOError: if the stream cannot be opened.
|
|
"""
|
|
stream_name = 'plaso_meta.{0:06d}'.format(number)
|
|
file_object = self._OpenStream(stream_name, 'r')
|
|
if file_object is None:
|
|
raise IOError(u'Unable to open stream: {0:s}'.format(stream_name))
|
|
return yaml.safe_load(file_object)
|
|
|
|
def GetBufferSize(self):
|
|
"""Return the size of the buffer."""
|
|
return self._buffer_size
|
|
|
|
def GetFileNumber(self):
|
|
"""Return the current file number of the storage."""
|
|
return self._file_number
|
|
|
|
def AddEventObject(self, event_object):
|
|
"""Adds an event object to the storage.
|
|
|
|
Args:
|
|
event_object: an event object (instance of EventObject).
|
|
|
|
Raises:
|
|
IOError: When trying to write to a closed storage file.
|
|
"""
|
|
if not self._file_open:
|
|
raise IOError(u'Trying to add an entry to a closed storage file.')
|
|
|
|
if event_object.timestamp > self._buffer_last_timestamp:
|
|
self._buffer_last_timestamp = event_object.timestamp
|
|
|
|
# TODO: support negative timestamps.
|
|
if (event_object.timestamp < self._buffer_first_timestamp and
|
|
event_object.timestamp > 0):
|
|
self._buffer_first_timestamp = event_object.timestamp
|
|
|
|
attributes = event_object.GetValues()
|
|
# Add values to counters.
|
|
if self._pre_obj:
|
|
self._pre_obj.counter['total'] += 1
|
|
self._pre_obj.counter[attributes.get('parser', 'N/A')] += 1
|
|
if 'plugin' in attributes:
|
|
self._pre_obj.plugin_counter[attributes.get('plugin', 'N/A')] += 1
|
|
|
|
# Add to temporary counter.
|
|
self._count_data_type[event_object.data_type] += 1
|
|
parser = attributes.get('parser', 'unknown_parser')
|
|
self._count_parser[parser] += 1
|
|
|
|
event_object_data = self._event_object_serializer.WriteSerialized(
|
|
event_object)
|
|
|
|
# TODO: Re-think this approach with the re-design of the storage.
|
|
# Check if the event object failed to serialize (none is returned).
|
|
if event_object_data is None:
|
|
return
|
|
|
|
heapq.heappush(
|
|
self._buffer, (event_object.timestamp, event_object_data))
|
|
self._buffer_size += len(event_object_data)
|
|
self._write_counter += 1
|
|
|
|
if self._buffer_size > self._max_buffer_size:
|
|
self._FlushBuffer()
|
|
|
|
def AddEventObjects(self, event_objects):
|
|
"""Adds an event objects to the storage.
|
|
|
|
Args:
|
|
event_objects: a list or generator of event objects (instances of
|
|
EventObject).
|
|
"""
|
|
for event_object in event_objects:
|
|
self.AddEventObject(event_object)
|
|
|
|
def HasTagging(self):
|
|
"""Return a bool indicating whether or not a Tag file is stored."""
|
|
for name in self._GetStreamNames():
|
|
if 'plaso_tagging.' in name:
|
|
return True
|
|
return False
|
|
|
|
def HasGrouping(self):
|
|
"""Return a bool indicating whether or not a Group file is stored."""
|
|
for name in self._GetStreamNames():
|
|
if 'plaso_grouping.' in name:
|
|
return True
|
|
return False
|
|
|
|
def HasReports(self):
|
|
"""Return a bool indicating whether or not a Report file is stored."""
|
|
for name in self._GetStreamNames():
|
|
if 'plaso_report.' in name:
|
|
return True
|
|
|
|
return False
|
|
|
|
def StoreReport(self, analysis_report):
|
|
"""Store an analysis report.
|
|
|
|
Args:
|
|
analysis_report: An analysis report object (instance of AnalysisReport).
|
|
"""
|
|
report_number = 1
|
|
for name in self._GetStreamNames():
|
|
if 'plaso_report.' in name:
|
|
_, _, number_string = name.partition('.')
|
|
try:
|
|
number = int(number_string, 10)
|
|
except ValueError:
|
|
logging.error(u'Unable to read in report number.')
|
|
number = 0
|
|
if number >= report_number:
|
|
report_number = number + 1
|
|
|
|
stream_name = 'plaso_report.{0:06}'.format(report_number)
|
|
serialized_report_proto = self._analysis_report_serializer.WriteSerialized(
|
|
analysis_report)
|
|
self._WriteStream(stream_name, serialized_report_proto)
|
|
|
|
def GetReports(self):
|
|
"""Read in all stored analysis reports from storage and yield them.
|
|
|
|
Raises:
|
|
IOError: if the stream cannot be opened.
|
|
"""
|
|
for stream_name in self._GetStreamNames():
|
|
if stream_name.startswith('plaso_report.'):
|
|
file_object = self._OpenStream(stream_name, 'r')
|
|
if file_object is None:
|
|
raise IOError(u'Unable to open stream: {0:s}'.format(stream_name))
|
|
|
|
report_string = file_object.read(self.MAX_REPORT_PROTOBUF_SIZE)
|
|
yield self._analysis_report_serializer.ReadSerialized(report_string)
|
|
|
|
def StoreGrouping(self, rows):
|
|
"""Store group information into the storage file.
|
|
|
|
An EventGroup protobuf stores information about several
|
|
EventObjects that belong to the same behavior or action. It can then
|
|
be used to group similar events together to create a super event, or
|
|
a higher level event.
|
|
|
|
This function is used to store that information inside the storage
|
|
file so it can be read later.
|
|
|
|
The object that is passed in needs to have an iterator implemented
|
|
and has to implement the following attributes (optional names within
|
|
bracket):
|
|
name - The name of the grouped event.
|
|
[description] - More detailed description of the event.
|
|
[category] - If this group of events falls into a specific category.
|
|
[color] - To highlight this particular group with a HTML color tag.
|
|
[first_timestamp] - The first timestamp if applicable of the group.
|
|
[last_timestamp] - The last timestamp if applicable of the group.
|
|
events - A list of tuples (store_number and store_index of the
|
|
EventObject protobuf that belongs to this group of events).
|
|
|
|
Args:
|
|
rows: An object that contains the necessary fields to construct
|
|
an EventGroup. Has to be a generator object or an object that implements
|
|
an iterator.
|
|
"""
|
|
group_number = 1
|
|
if self.HasGrouping():
|
|
for name in self._GetStreamNames():
|
|
if 'plaso_grouping.' in name:
|
|
_, number = name.split('.')
|
|
if int(number) >= group_number:
|
|
group_number = int(number) + 1
|
|
|
|
group_packed = []
|
|
size = 0
|
|
for row in rows:
|
|
group = plaso_storage_pb2.EventGroup()
|
|
group.name = row.name
|
|
if hasattr(row, 'description'):
|
|
group.description = utils.GetUnicodeString(row.description)
|
|
if hasattr(row, 'category'):
|
|
group.category = utils.GetUnicodeString(row.category)
|
|
if hasattr(row, 'color'):
|
|
group.color = utils.GetUnicodeString(row.color)
|
|
|
|
for number, index in row.events:
|
|
evt = group.events.add()
|
|
evt.store_number = int(number)
|
|
evt.store_index = int(index)
|
|
|
|
if hasattr(row, 'first_timestamp'):
|
|
group.first_timestamp = int(row.first_timestamp)
|
|
if hasattr(row, 'last_timestamp'):
|
|
group.last_timestamp = int(row.last_timestamp)
|
|
|
|
# TODO: implement event grouping.
|
|
group_str = group.SerializeToString()
|
|
packed = struct.pack('<I', len(group_str)) + group_str
|
|
# TODO: Size is defined, should be used to determine if we've filled
|
|
# our buffer size of group information. Check that and write a new
|
|
# group store file in that case.
|
|
size += len(packed)
|
|
if size > self._max_buffer_size:
|
|
logging.warning(u'Grouping has outgrown buffer size.')
|
|
group_packed.append(packed)
|
|
|
|
stream_name = 'plaso_grouping.{0:06d}'.format(group_number)
|
|
self._WriteStream(stream_name, ''.join(group_packed))
|
|
|
|
def StoreTagging(self, tags):
|
|
"""Store tag information into the storage file.
|
|
|
|
Each EventObject can be tagged either manually or automatically
|
|
to make analysis simpler, by providing more context to certain
|
|
events or to highlight events for later viewing.
|
|
|
|
The object passed in needs to be a list (or otherwise an iterator)
|
|
that contains EventTag objects (event.EventTag).
|
|
|
|
Args:
|
|
tags: A list or an object providing an iterator that contains
|
|
EventTag objects.
|
|
|
|
Raises:
|
|
IOError: if the stream cannot be opened.
|
|
"""
|
|
if not self._pre_obj:
|
|
self._pre_obj = event.PreprocessObject()
|
|
|
|
if not hasattr(self._pre_obj, 'collection_information'):
|
|
self._pre_obj.collection_information = {}
|
|
|
|
self._pre_obj.collection_information['Action'] = 'Adding tags to storage.'
|
|
self._pre_obj.collection_information['time_of_run'] = (
|
|
timelib.Timestamp.GetNow())
|
|
if not hasattr(self._pre_obj, 'counter'):
|
|
self._pre_obj.counter = collections.Counter()
|
|
|
|
tag_number = 1
|
|
for name in self._GetStreamNames():
|
|
if 'plaso_tagging.' in name:
|
|
_, number = name.split('.')
|
|
if int(number) >= tag_number:
|
|
tag_number = int(number) + 1
|
|
if self._event_tag_index is None:
|
|
self._BuildTagIndex()
|
|
|
|
tag_packed = []
|
|
tag_index = []
|
|
size = 0
|
|
for tag in tags:
|
|
self._pre_obj.counter['Total Tags'] += 1
|
|
if hasattr(tag, 'tags'):
|
|
for tag_entry in tag.tags:
|
|
self._pre_obj.counter[tag_entry] += 1
|
|
|
|
if self._event_tag_index is not None:
|
|
tag_index_value = self._event_tag_index.get(tag.string_key, None)
|
|
else:
|
|
tag_index_value = None
|
|
|
|
# This particular event has already been tagged on a previous occasion,
|
|
# we need to make sure we are appending to that particular tag.
|
|
if tag_index_value is not None:
|
|
stream_name = 'plaso_tagging.{0:06d}'.format(
|
|
tag_index_value.store_number)
|
|
|
|
tag_file_object = self._OpenStream(stream_name, 'r')
|
|
if tag_file_object is None:
|
|
raise IOError(u'Unable to open stream: {0:s}'.format(stream_name))
|
|
|
|
# Since zipfile.ZipExtFile is not seekable we need to read upto
|
|
# the store offset.
|
|
_ = tag_file_object.read(tag_index_value.store_offset)
|
|
|
|
old_tag = self._ReadEventTag(tag_file_object)
|
|
|
|
# TODO: move the append functionality into EventTag.
|
|
# Maybe name the function extend or update?
|
|
if hasattr(old_tag, 'tags'):
|
|
tag.tags.extend(old_tag.tags)
|
|
|
|
if hasattr(old_tag, 'comment'):
|
|
if hasattr(tag, 'comment'):
|
|
tag.comment += old_tag.comment
|
|
else:
|
|
tag.comment = old_tag.comment
|
|
|
|
if hasattr(old_tag, 'color') and not hasattr(tag, 'color'):
|
|
tag.color = old_tag.color
|
|
|
|
serialized_event_tag = self._event_tag_serializer.WriteSerialized(tag)
|
|
|
|
# TODO: move to write class function of _EventTagIndexValue.
|
|
packed = (
|
|
struct.pack('<I', len(serialized_event_tag)) + serialized_event_tag)
|
|
ofs = struct.pack('<I', size)
|
|
if getattr(tag, 'store_number', 0):
|
|
struct_string = (
|
|
construct.Byte('type').build(1) + ofs +
|
|
_EventTagIndexValue.TAG_STORE_STRUCT.build(tag))
|
|
else:
|
|
struct_string = (
|
|
construct.Byte('type').build(2) + ofs +
|
|
_EventTagIndexValue.TAG_UUID_STRUCT.build(tag))
|
|
|
|
tag_index.append(struct_string)
|
|
size += len(packed)
|
|
tag_packed.append(packed)
|
|
|
|
stream_name = 'plaso_tag_index.{0:06d}'.format(tag_number)
|
|
self._WriteStream(stream_name, ''.join(tag_index))
|
|
|
|
stream_name = 'plaso_tagging.{0:06d}'.format(tag_number)
|
|
self._WriteStream(stream_name, ''.join(tag_packed))
|
|
|
|
# TODO: Update the tags that have changed in the index instead
|
|
# of flushing the index.
|
|
|
|
# If we already built a list of tag in memory we need to clear that
|
|
# since the tags have changed.
|
|
if self._event_tag_index is not None:
|
|
del self._event_tag_index
|
|
|
|
|
|
class StorageFileWriter(queue.EventObjectQueueConsumer):
|
|
"""Class that implements a storage file writer object."""
|
|
|
|
def __init__(
|
|
self, storage_queue, output_file, buffer_size=0, pre_obj=None,
|
|
serializer_format='proto'):
|
|
"""Initializes the storage file writer.
|
|
|
|
Args:
|
|
storage_queue: the storage queue (instance of Queue).
|
|
output_file: The path to the output file.
|
|
buffer_size: The estimated size of a protobuf file.
|
|
pre_obj: A preprocessing object (instance of PreprocessObject).
|
|
serializer_format: A string containing either "proto" or "json". Defaults
|
|
to proto.
|
|
"""
|
|
super(StorageFileWriter, self).__init__(storage_queue)
|
|
self._buffer_size = buffer_size
|
|
self._output_file = output_file
|
|
self._pre_obj = pre_obj
|
|
self._serializer_format = serializer_format
|
|
self._storage_file = None
|
|
|
|
def _ConsumeEventObject(self, event_object, **unused_kwargs):
|
|
"""Consumes an event object callback for ConsumeEventObjects."""
|
|
self._storage_file.AddEventObject(event_object)
|
|
|
|
def WriteEventObjects(self):
|
|
"""Writes the event objects that are pushed on the queue."""
|
|
self._storage_file = StorageFile(
|
|
self._output_file, buffer_size=self._buffer_size, pre_obj=self._pre_obj,
|
|
serializer_format=self._serializer_format)
|
|
self.ConsumeEventObjects()
|
|
self._storage_file.Close()
|
|
|
|
|
|
class BypassStorageWriter(queue.EventObjectQueueConsumer):
|
|
"""Watch a queue with EventObjects and send them directly to output."""
|
|
|
|
def __init__(
|
|
self, storage_queue, output_file, output_module_string='l2tcsv',
|
|
pre_obj=None):
|
|
"""Initializes the bypass storage writer.
|
|
|
|
Args:
|
|
storage_queue: the storage queue (instance of Queue).
|
|
output_file: The path to the output file.
|
|
output_module_string: The output module string.
|
|
pre_obj: A preprocessing object (instance of PreprocessObject).
|
|
"""
|
|
super(BypassStorageWriter, self).__init__(storage_queue)
|
|
self._output_file = output_file
|
|
self._output_module = None
|
|
self._output_module_string = output_module_string
|
|
self._pre_obj = pre_obj
|
|
self._pre_obj.store_range = (1, 1)
|
|
|
|
def _ConsumeEventObject(self, event_object, **unused_kwargs):
|
|
"""Consumes an event object callback for ConsumeEventObjects."""
|
|
# Set the store number and index to default values since they are not used.
|
|
event_object.store_number = 1
|
|
event_object.store_index = -1
|
|
|
|
self._output_module.WriteEvent(event_object)
|
|
|
|
# Typically you will have a storage object that has this function,
|
|
# as in you can call store.GetStorageInformation and that will read
|
|
# the information from the store. However in this case we are not
|
|
# actually using a storage file, we are using a "storage bypass" file,
|
|
# and since some parts of the codebase expect this to be set (an
|
|
# interface if you want to call it that way, although the storage
|
|
# has not been abstracted into an interface, perhaps it should be)
|
|
# then this has to be set. And the interface behavior is to return
|
|
# a list of all available storage information objects (or all pre_obj
|
|
# stored in the storage file)
|
|
def GetStorageInformation(self):
|
|
"""Return information about the storage object (used by output modules)."""
|
|
return [self._pre_obj]
|
|
|
|
def WriteEventObjects(self):
|
|
"""Writes the event objects that are pushed on the queue."""
|
|
output_class = output.GetOutputFormatter(self._output_module_string)
|
|
if not output_class:
|
|
output_class = output.GetOutputFormatter('L2tcsv')
|
|
|
|
self._output_module = output_class(
|
|
self, self._output_file, config=self._pre_obj)
|
|
|
|
self._output_module.Start()
|
|
self.ConsumeEventObjects()
|
|
self._output_module.End()
|