#!/usr/bin/python # -*- coding: utf-8 -*- # # Copyright 2014 The Plaso Project Authors. # Please see the AUTHORS file for details on individual authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Tests for the serializer object implementation using protobuf.""" import unittest from plaso.lib import event from plaso.proto import plaso_storage_pb2 from plaso.serializer import protobuf_serializer class ProtobufAnalysisReportSerializerTest(unittest.TestCase): """Tests for the protobuf analysis report serializer object.""" def setUp(self): """Sets up the needed objects used throughout the test.""" # TODO: add an analysis report test. pass def testReadSerialized(self): """Test the read serialized functionality.""" # TODO: add an analysis report test. pass def testWriteSerialized(self): """Test the write serialized functionality.""" # TODO: add an analysis report test. pass class ProtobufEventObjectSerializerTest(unittest.TestCase): """Tests for the protobuf event object serializer object.""" def setUp(self): """Sets up the needed objects used throughout the test.""" proto = plaso_storage_pb2.EventObject() proto.data_type = 'test:event2' proto.timestamp = 1234124 proto.timestamp_desc = 'Written' serializer = protobuf_serializer.ProtobufEventAttributeSerializer proto_attribute = proto.attributes.add() serializer.WriteSerializedObject(proto_attribute, 'zero_integer', 0) proto_attribute = proto.attributes.add() dict_object = { 'a': 'not b', 'c': 34, 'list': ['sf', 234], 'an': [234, 32]} serializer.WriteSerializedObject(proto_attribute, 'my_dict', dict_object) proto_attribute = proto.attributes.add() tuple_object = ( 'some item', [234, 52, 15], {'a': 'not a', 'b': 'not b'}, 35) serializer.WriteSerializedObject(proto_attribute, 'a_tuple', tuple_object) proto_attribute = proto.attributes.add() list_object = ['asf', 4234, 2, 54, 'asf'] serializer.WriteSerializedObject(proto_attribute, 'my_list', list_object) proto_attribute = proto.attributes.add() serializer.WriteSerializedObject( proto_attribute, 'unicode_string', u'And I\'m a unicorn.') proto_attribute = proto.attributes.add() serializer.WriteSerializedObject(proto_attribute, 'integer', 34) proto_attribute = proto.attributes.add() serializer.WriteSerializedObject(proto_attribute, 'string', 'Normal string') proto.uuid = '5a78777006de4ddb8d7bbe12ab92ccf8' self._proto_string = proto.SerializeToString() def testReadSerialized(self): """Test the read serialized functionality.""" serializer = protobuf_serializer.ProtobufEventObjectSerializer event_object = serializer.ReadSerialized(self._proto_string) # An integer value containing 0 should get stored. self.assertTrue(hasattr(event_object, 'zero_integer')) attribute_value = getattr(event_object, 'integer', 0) self.assertEquals(attribute_value, 34) attribute_value = getattr(event_object, 'my_list', []) self.assertEquals(len(attribute_value), 5) attribute_value = getattr(event_object, 'string', '') self.assertEquals(attribute_value, 'Normal string') attribute_value = getattr(event_object, 'unicode_string', u'') self.assertEquals(attribute_value, u'And I\'m a unicorn.') attribute_value = getattr(event_object, 'a_tuple', ()) self.assertEquals(len(attribute_value), 4) def testWriteSerialized(self): """Test the write serialized functionality.""" event_object = event.EventObject() event_object.data_type = 'test:event2' event_object.timestamp = 1234124 event_object.timestamp_desc = 'Written' # Prevent the event object for generating its own UUID. event_object.uuid = '5a78777006de4ddb8d7bbe12ab92ccf8' event_object.empty_string = u'' event_object.zero_integer = 0 event_object.integer = 34 event_object.string = 'Normal string' event_object.unicode_string = u'And I\'m a unicorn.' event_object.my_list = ['asf', 4234, 2, 54, 'asf'] event_object.my_dict = { 'a': 'not b', 'c': 34, 'list': ['sf', 234], 'an': [234, 32]} event_object.a_tuple = ( 'some item', [234, 52, 15], {'a': 'not a', 'b': 'not b'}, 35) event_object.null_value = None serializer = protobuf_serializer.ProtobufEventObjectSerializer proto_string = serializer.WriteSerialized(event_object) self.assertEquals(proto_string, self._proto_string) event_object = serializer.ReadSerialized(proto_string) # An empty string should not get stored. self.assertFalse(hasattr(event_object, 'empty_string')) # A None (or Null) value should not get stored. self.assertFalse(hasattr(event_object, 'null_value')) class ProtobufEventTagSerializerTest(unittest.TestCase): """Tests for the protobuf event tag serializer object.""" def setUp(self): """Sets up the needed objects used throughout the test.""" proto = plaso_storage_pb2.EventTagging() proto.store_number = 234 proto.store_index = 18 proto.comment = u'My first comment.' proto.color = u'Red' proto_tag = proto.tags.add() proto_tag.value = u'Malware' proto_tag = proto.tags.add() proto_tag.value = u'Common' self._proto_string = proto.SerializeToString() def testReadSerialized(self): """Test the read serialized functionality.""" serializer = protobuf_serializer.ProtobufEventTagSerializer event_tag = serializer.ReadSerialized(self._proto_string) self.assertEquals(event_tag.color, u'Red') self.assertEquals(event_tag.comment, u'My first comment.') self.assertEquals(event_tag.store_index, 18) self.assertEquals(len(event_tag.tags), 2) self.assertEquals(event_tag.tags, [u'Malware', u'Common']) def testWriteSerialized(self): """Test the write serialized functionality.""" event_tag = event.EventTag() event_tag.store_number = 234 event_tag.store_index = 18 event_tag.comment = u'My first comment.' event_tag.color = u'Red' event_tag.tags = [u'Malware', u'Common'] serializer = protobuf_serializer.ProtobufEventTagSerializer proto_string = serializer.WriteSerialized(event_tag) self.assertEquals(proto_string, self._proto_string) class ProtobufPreprocessObjectSerializerTest(unittest.TestCase): """Tests for the protobuf preprocess object serializer object.""" def setUp(self): """Sets up the needed objects used throughout the test.""" # TODO: add a preprocess object test. pass def testReadSerialized(self): """Test the read serialized functionality.""" # TODO: add a preprocess object test. pass def testWriteSerialized(self): """Test the write serialized functionality.""" # TODO: add a preprocess object test. pass if __name__ == '__main__': unittest.main()