plaso-rubanetra/plaso/lib/storage_test.py
2020-04-06 18:48:34 +02:00

341 lines
12 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright 2012 The Plaso Project Authors.
# Please see the AUTHORS file for details on individual authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This file contains the tests for the event storage."""
import os
import tempfile
import shutil
import unittest
import zipfile
from plaso.engine import queue
from plaso.events import text_events
from plaso.events import windows_events
from plaso.formatters import manager as formatters_manager
from plaso.lib import event
from plaso.lib import eventdata
from plaso.lib import pfilter
from plaso.lib import storage
from plaso.lib import timelib_test
from plaso.multi_processing import multi_process
from plaso.formatters import winreg # pylint: disable=unused-import
from plaso.serializer import protobuf_serializer
class DummyObject(object):
"""Dummy object."""
class GroupMock(object):
"""Mock a class for grouping events together."""
def __init__(self):
self.groups = []
def AddGroup(self, name, events, desc=None, first=0, last=0, color=None,
cat=None):
"""Add a new group of events."""
self.groups.append((name, events, desc, first, last, color, cat))
def __iter__(self):
"""Iterator."""
for name, events, desc, first, last, color, cat in self.groups:
dummy = DummyObject()
dummy.name = name
dummy.events = events
if desc:
dummy.description = desc
if first:
dummy.first_timestamp = int(first)
if last:
dummy.last_timestamp = int(last)
if color:
dummy.color = color
if cat:
dummy.category = cat
yield dummy
class TempDirectory(object):
"""A self cleaning temporary directory."""
def __init__(self):
"""Initializes the temporary directory."""
super(TempDirectory, self).__init__()
self.name = u''
def __enter__(self):
"""Make this work with the 'with' statement."""
self.name = tempfile.mkdtemp()
return self.name
def __exit__(self, unused_type, unused_value, unused_traceback):
"""Make this work with the 'with' statement."""
shutil.rmtree(self.name, True)
class StorageFileTest(unittest.TestCase):
"""Tests for the plaso storage file."""
def setUp(self):
"""Sets up the needed objects used throughout the test."""
self._event_objects = []
# TODO: replace hardcoded timestamps by timelib_test.CopyStringToTimestamp.
event_1 = windows_events.WindowsRegistryEvent(
13349615269295969, u'MY AutoRun key', {u'Value': u'c:/Temp/evil.exe'})
event_1.parser = 'UNKNOWN'
event_2 = windows_events.WindowsRegistryEvent(
13359662069295961, u'\\HKCU\\Secret\\EvilEmpire\\Malicious_key',
{u'Value': u'send all the exes to the other world'})
event_2.parser = 'UNKNOWN'
event_3 = windows_events.WindowsRegistryEvent(
13349402860000000, u'\\HKCU\\Windows\\Normal',
{u'Value': u'run all the benign stuff'})
event_3.parser = 'UNKNOWN'
text_dict = {'text': (
'This is a line by someone not reading the log line properly. And '
'since this log line exceeds the accepted 80 chars it will be '
'shortened.'), 'hostname': 'nomachine', 'username': 'johndoe'}
event_4 = text_events.TextEvent(12389344590000000, 12, text_dict)
event_4.parser = 'UNKNOWN'
self._event_objects.append(event_1)
self._event_objects.append(event_2)
self._event_objects.append(event_3)
self._event_objects.append(event_4)
def testStorageWriter(self):
"""Test the storage writer."""
self.assertEquals(len(self._event_objects), 4)
# The storage writer is normally run in a separate thread.
# For the purpose of this test it has to be run in sequence,
# hence the call to WriteEventObjects after all the event objects
# have been queued up.
# TODO: add upper queue limit.
test_queue = multi_process.MultiProcessingQueue()
test_queue_producer = queue.ItemQueueProducer(test_queue)
test_queue_producer.ProduceItems(self._event_objects)
test_queue_producer.SignalEndOfInput()
with tempfile.NamedTemporaryFile() as temp_file:
storage_writer = storage.StorageFileWriter(test_queue, temp_file)
storage_writer.WriteEventObjects()
z_file = zipfile.ZipFile(temp_file, 'r', zipfile.ZIP_DEFLATED)
expected_z_filename_list = [
'plaso_index.000001', 'plaso_meta.000001', 'plaso_proto.000001',
'plaso_timestamps.000001', 'serializer.txt']
z_filename_list = sorted(z_file.namelist())
self.assertEquals(len(z_filename_list), 5)
self.assertEquals(z_filename_list, expected_z_filename_list)
def testStorage(self):
"""Test the storage object."""
event_objects = []
timestamps = []
group_mock = GroupMock()
tags = []
tags_mock = []
groups = []
group_events = []
same_events = []
serializer = protobuf_serializer.ProtobufEventObjectSerializer
with TempDirectory() as dirname:
temp_file = os.path.join(dirname, 'plaso.db')
store = storage.StorageFile(temp_file)
store.AddEventObjects(self._event_objects)
# Add tagging.
tag_1 = event.EventTag()
tag_1.store_index = 0
tag_1.store_number = 1
tag_1.comment = 'My comment'
tag_1.color = 'blue'
tags_mock.append(tag_1)
tag_2 = event.EventTag()
tag_2.store_index = 1
tag_2.store_number = 1
tag_2.tags = ['Malware']
tag_2.color = 'red'
tags_mock.append(tag_2)
tag_3 = event.EventTag()
tag_3.store_number = 1
tag_3.store_index = 2
tag_3.comment = 'This is interesting'
tag_3.tags = ['Malware', 'Benign']
tag_3.color = 'red'
tags_mock.append(tag_3)
store.StoreTagging(tags_mock)
# Add additional tagging, second round.
tag_4 = event.EventTag()
tag_4.store_index = 1
tag_4.store_number = 1
tag_4.tags = ['Interesting']
store.StoreTagging([tag_4])
group_mock.AddGroup(
'Malicious', [(1, 1), (1, 2)], desc='Events that are malicious',
color='red', first=13349402860000000, last=13349615269295969,
cat='Malware')
store.StoreGrouping(group_mock)
store.Close()
read_store = storage.StorageFile(temp_file, read_only=True)
self.assertTrue(read_store.HasTagging())
self.assertTrue(read_store.HasGrouping())
for event_object in read_store.GetEntries(1):
event_objects.append(event_object)
timestamps.append(event_object.timestamp)
if event_object.data_type == 'windows:registry:key_value':
self.assertEquals(event_object.timestamp_desc,
eventdata.EventTimestamp.WRITTEN_TIME)
else:
self.assertEquals(event_object.timestamp_desc,
eventdata.EventTimestamp.WRITTEN_TIME)
for tag in read_store.GetTagging():
event_object = read_store.GetTaggedEvent(tag)
tags.append(event_object)
groups = list(read_store.GetGrouping())
self.assertEquals(len(groups), 1)
group_events = list(read_store.GetEventsFromGroup(groups[0]))
# Read the same events that were put in the group, just to compare
# against.
event_object = read_store.GetEventObject(1, 1)
serialized_event_object = serializer.WriteSerialized(event_object)
same_events.append(serialized_event_object)
event_object = read_store.GetEventObject(1, 2)
serialized_event_object = serializer.WriteSerialized(event_object)
same_events.append(serialized_event_object)
self.assertEquals(len(event_objects), 4)
self.assertEquals(len(tags), 4)
self.assertEquals(tags[0].timestamp, 12389344590000000)
self.assertEquals(tags[0].store_number, 1)
self.assertEquals(tags[0].store_index, 0)
self.assertEquals(tags[0].tag.comment, u'My comment')
self.assertEquals(tags[0].tag.color, u'blue')
msg, _ = formatters_manager.EventFormatterManager.GetMessageStrings(tags[0])
self.assertEquals(msg[0:10], u'This is a ')
self.assertEquals(tags[1].tag.tags[0], 'Malware')
msg, _ = formatters_manager.EventFormatterManager.GetMessageStrings(tags[1])
self.assertEquals(msg[0:15], u'[\\HKCU\\Windows\\')
self.assertEquals(tags[2].tag.comment, u'This is interesting')
self.assertEquals(tags[2].tag.tags[0], 'Malware')
self.assertEquals(tags[2].tag.tags[1], 'Benign')
self.assertEquals(tags[2].parser, 'UNKNOWN')
# Test the newly added fourth tag, which should include data from
# the first version as well.
self.assertEquals(tags[3].tag.tags[0], 'Interesting')
self.assertEquals(tags[3].tag.tags[1], 'Malware')
expected_timestamps = [
12389344590000000, 13349402860000000, 13349615269295969,
13359662069295961]
self.assertEquals(timestamps, expected_timestamps)
self.assertEquals(groups[0].name, u'Malicious')
self.assertEquals(groups[0].category, u'Malware')
self.assertEquals(groups[0].color, u'red')
self.assertEquals(groups[0].description, u'Events that are malicious')
self.assertEquals(groups[0].first_timestamp, 13349402860000000)
self.assertEquals(groups[0].last_timestamp, 13349615269295969)
self.assertEquals(len(group_events), 2)
self.assertEquals(group_events[0].timestamp, 13349402860000000)
self.assertEquals(group_events[1].timestamp, 13349615269295969L)
proto_group_events = []
for group_event in group_events:
serialized_event_object = serializer.WriteSerialized(group_event)
proto_group_events.append(serialized_event_object)
self.assertEquals(same_events, proto_group_events)
class StoreStorageTest(unittest.TestCase):
"""Test sorting storage file,"""
def setUp(self):
"""Setup sets parameters that will be reused throughout this test."""
# TODO: have sample output generated from the test.
# TODO: Use input data with a defined year. syslog parser chooses a
# year based on system clock; forcing updates to test file if regenerated.
self.test_file = os.path.join('test_data', 'psort_test.out')
self.first = timelib_test.CopyStringToTimestamp('2012-07-20 15:44:14')
self.last = timelib_test.CopyStringToTimestamp('2016-11-18 01:15:43')
def testStorageSort(self):
"""This test ensures that items read and output are in the expected order.
This method by design outputs data as it runs. In order to test this a
a modified output renderer is used for which the flush functionality has
been removed.
The test will be to read the TestEventBuffer storage and check to see
if it matches the known good sort order.
"""
pfilter.TimeRangeCache.ResetTimeConstraints()
pfilter.TimeRangeCache.SetUpperTimestamp(self.last)
pfilter.TimeRangeCache.SetLowerTimestamp(self.first)
store = storage.StorageFile(self.test_file, read_only=True)
store.store_range = [1, 5, 6]
read_list = []
event_object = store.GetSortedEntry()
while event_object:
read_list.append(event_object.timestamp)
event_object = store.GetSortedEntry()
expected_timestamps = [
1344270407000000L, 1392438730000000L, 1427151678000000L,
1451584472000000L]
self.assertEquals(read_list, expected_timestamps)
if __name__ == '__main__':
unittest.main()