Shared Bounded Queue in Pharo Smalltalk

PharoScreenshotO método SharedBoundedQueue>>#nextPut: mostrado na figura acima faz parte da implementação de um circular buffer com contenção ou bloqueio de escrita quando o buffer está cheio. A contenção de leitura quando o buffer está vazio é obtida no método abaixo:

SharedBoundedQueue_next

SharedQueue, atualmente usada na implementação de conexões em SmallFBP, só tem contenção na leitura. SharedBoundedQueue também permite uma expansão (Veja os métodos abaixo).

SharedBoundeQueue_grownBy

SharedBoundQueue_packed

Um exemplo usando SharedBoundedQueue foi adaptado de um exemplo num dos posts sobre SmallFBP:

exampleWithClosures

exampleWithClosures2

exampleWithClosures3

Neste pequeno desenvolvimento comecei a experimentar o framework AutoTest. Ele está dentro do espírito do teste contínuo.

Os file outs das implementação e testes de unidade seguem abaixo  e versões futuras podem ser encontradas no repositório de SmallFBP.

Stream subclass: #SharedBoundedQueue
	instanceVariableNames: 'capacity monitor items count start end'
	classVariableNames: ''
	poolDictionaries: ''
	category: 'SharedBoundedQueue'!
!SharedBoundedQueue commentStamp: 'FranciscoAryMartins 6/12/2015 19:56' prior: 0!
I'm a FIFO circular buffer who blocks on empty for readings or full for writings.

I'm a subclass for Stream but implement partial protocol.

Public API and Key Messages

- capacity   
- capacity:
- contents
- grownBy:
- isEmpty
- isFull
- next
- nextPut
- peek
- size

- SharedBoundedQueue capacity: anInteger create a instance with room for anInteger elements.

   One simple example is simply gorgeous.

Internal Representation and Key Implementation Points.

    Instance Variables
	capacity:		anInteger
	count:		anInteger
	end:		anInteger
	items:		anArray
	monitor:		aMonitor
	start:		anInteger!

!SharedBoundedQueue methodsFor: 'stream-protocol' stamp: 'FranciscoAryMartins 6/12/2015 19:29'!
grownBy: anInteger
	<require: 'anInteger > 0'>
	<ensure: 'self capacity = old_ capacity + anInteger'>
	monitor
		critical: [ 
			items := self packed grownBy: capacity - count + anInteger.
			start := 1.
			end := count.
			capacity := items size.
			monitor signal 
		]! !

!SharedBoundedQueue methodsFor: 'initialization' stamp: 'FranciscoAryMartins 6/11/2015 12:03'!
initialize
	super initialize.
	monitor := Monitor new.
	items := Array new: capacity.
	start := 1.
	end := count := 0! !

!SharedBoundedQueue methodsFor: 'private ' stamp: 'FranciscoAryMartins 6/12/2015 19:43'!
packed
	^ count = 0
		ifTrue: [ Array new ]
		ifFalse: [ 
			start < end
				ifTrue: [ 
					self isFull
						ifTrue: [ items ]
						ifFalse: [ items copyFrom: start to: end ] ]
				ifFalse: [ (items copyFrom: start to: capacity) , (items copyFrom: 1 to: end) ] ]! !

!SharedBoundedQueue methodsFor: 'private ' stamp: 'FranciscoAryMartins 6/8/2015 22:38'!
capacity: anInteger 
	capacity := anInteger ! !

!SharedBoundedQueue methodsFor: 'accessing' stamp: 'FranciscoAryMartins 6/12/2015 19:28'!
contents
	^ monitor critical: [ self packed ]! !

!SharedBoundedQueue methodsFor: 'accessing' stamp: 'FranciscoAryMartins 6/9/2015 21:29'!
next
	| item |
	"See http://en.wikipedia.org/wiki/Circular_buffer"
	^monitor critical: [
		monitor waitWhile: [ count = 0 ].
      item := items at: start.
		start := start \\ capacity + 1.
		count := count - 1.
		monitor signal.
		item
	]
! !

!SharedBoundedQueue methodsFor: 'accessing' stamp: 'FranciscoAryMartins 6/9/2015 07:56'!
peek
	^monitor critical: [
		count = 0 ifTrue: [ nil ] ifFalse: [ items at: start ] ]! !

!SharedBoundedQueue methodsFor: 'accessing' stamp: 'FranciscoAryMartins 6/9/2015 21:01'!
capacity
	^ monitor critical: [capacity]! !

!SharedBoundedQueue methodsFor: 'accessing' stamp: 'FranciscoAryMartins 6/9/2015 12:59'!
nextPut: anObject 
	"See http://en.wikipedia.org/wiki/Circular_buffer"
	monitor critical: [
		monitor waitUntil: [ count < capacity ].
		end := end \\ capacity + 1.
		items at: end put: anObject.
		count := count + 1.
		monitor signal
	].
	^ anObject! !

!SharedBoundedQueue methodsFor: 'accessing' stamp: 'FranciscoAryMartins 6/9/2015 00:17'!
size
	^monitor critical: [ count ]! !

!SharedBoundedQueue methodsFor: 'testing' stamp: 'FranciscoAryMartins 6/10/2015 17:21'!
isFull
	^monitor critical: [ count = capacity ]! !

!SharedBoundedQueue methodsFor: 'testing' stamp: 'FranciscoAryMartins 6/8/2015 23:53'!
isEmpty
	^monitor critical: [ count = 0 ]! !

"-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- "!

SharedBoundedQueue class
	instanceVariableNames: ''!

!SharedBoundedQueue class methodsFor: 'examples' stamp: 'FranciscoAryMartins 6/10/2015 21:01'!
exampleWithClosures
	| maxNumber numbers filter printer dropper integers maxInteger accepteds rejecteds |

	self inform: 'Open a transcript for see results'.

   "Components"
	maxNumber := [ :out :value | out nextPut: value ].

	numbers := [ :max :out | | count rnd maximum |
		maximum := max next.
		count := 1.
		rnd := Random new.
		[ count <= maximum ] whileTrue: [ | number |
			number := rnd nextInt: 99.
			out nextPut: number.
			count := count + 1 
		]
	].

	filter := [ :in :accept :reject | | integer | 
		[  		
			integer := in next.
			(integer odd)
				ifTrue:  [ accept nextPut: integer ]
				ifFalse: [ reject   nextPut: integer ] 
		] repeat 
	].

	printer := [ :in | | integer |
		Transcript clear. 
		[
			integer := in next.
			Transcript space; show: integer. 
		] repeat 
	].

	dropper := [ :in | | dropMe |
		[  dropMe := in next ] repeat 
	].

   "Connections"   	
	maxInteger := SharedBoundedQueue capacity: 10.
	integers := SharedBoundedQueue capacity: 10.
	accepteds := SharedBoundedQueue capacity: 10.
	rejecteds := SharedBoundedQueue capacity: 10.

	"Network and start"
	[ maxNumber value: maxInteger value: 15 ] fork.
      [ numbers value: maxInteger value: integers ] fork.
	[ filter value: integers value: accepteds value: rejecteds ] fork.
	[ printer value: accepteds ] fork.
	[ dropper value: rejecteds ] fork.! !

!SharedBoundedQueue class methodsFor: 'instance creation' stamp: 'FranciscoAryMartins 6/8/2015 22:43'!
capacity: anInteger 
	| queue |
	queue := self basicNew.
	queue capacity: anInteger.
	queue initialize.
	^ queue! !
TestCase subclass: #SharedBoundedQueueTest
	instanceVariableNames: ''
	classVariableNames: ''
	poolDictionaries: ''
	category: 'SharedBoundedQueue-Tests'!
!SharedBoundedQueueTest commentStamp: '<historical>' prior: 0!
A BoundedQueueTest is a test class for testing the behavior of BoundedQueue!

!SharedBoundedQueueTest methodsFor: 'tests' stamp: 'FranciscoAryMartins 6/9/2015 11:32'!
testNextPutContention
	| q s |
	q := SharedBoundedQueue capacity: 3.
	q nextPut: 1; nextPut: 2; nextPut: 3.
	s := #blocked.
	[ q nextPut: 4. s := #unblocked ] fork.
	Processor yield.
	self assert: s = #blocked! !

!SharedBoundedQueueTest methodsFor: 'tests' stamp: 'FranciscoAryMartins 6/9/2015 12:53'!
testPeek
	| q |
	q := SharedBoundedQueue capacity: 3.
	q nextPut: 1.
	self should: q peek = 1.
	self should: q peek = 1.! !

!SharedBoundedQueueTest methodsFor: 'tests' stamp: 'FranciscoAryMartins 6/11/2015 13:28'!
testGrownByOnMiddle
	| q |
	q := SharedBoundedQueue capacity: 5.
	q nextPut: 1; nextPut: 2; nextPut: 3; nextPut: 4.
	q next.
	q grownBy: 2.
	self should: 7 = q capacity.
	q nextPut: 5; nextPut: 6; nextPut: 7; nextPut: 8.
	self should: q isFull! !

!SharedBoundedQueueTest methodsFor: 'tests' stamp: 'FranciscoAryMartins 6/9/2015 11:32'!
testNextContention
	| q s |
	q := SharedBoundedQueue capacity: 3.
	q nextPut: 1; nextPut: 2; nextPut: 3.
	q next; next; next.
	s := #blocked.
	[ q next. s := #unblocked ] fork.
	Processor yield.
	self assert: s = #blocked! !

!SharedBoundedQueueTest methodsFor: 'tests' stamp: 'FranciscoAryMartins 6/9/2015 21:41'!
testRemoveNextPutContentionWithGrownBy
	| q s |
	q := SharedBoundedQueue capacity: 3.
	q nextPut: 1; nextPut: 2; nextPut: 3.
	s := #blocked.
	[ q nextPut: 4. s := #unblocked ] fork.
	Processor yield.
	self should: s = #blocked.
	q grownBy: 2.
	Processor yield.
	self should: s = #unblocked
! !

!SharedBoundedQueueTest methodsFor: 'tests' stamp: 'FranciscoAryMartins 6/9/2015 21:12'!
testGrownBy
	| q |
	q := SharedBoundedQueue capacity: 3.
	q grownBy: 2.
	self should: 5 = q capacity! !

!SharedBoundedQueueTest methodsFor: 'tests' stamp: 'FranciscoAryMartins 6/9/2015 12:56'!
testSize
	| q |
	q := SharedBoundedQueue capacity: 3.
	q nextPut: 1; nextPut: 1.
	self should: q size = 2! !

!SharedBoundedQueueTest methodsFor: 'tests' stamp: 'FranciscoAryMartins 6/12/2015 13:59'!
testGrownByOnFull
	| q |
	q := SharedBoundedQueue capacity: 3.
	q nextPut: 1; nextPut: 2; nextPut: 3.
	q grownBy: 2.
	self should: q isFull not.
	self should: 5 = q capacity.
	q nextPut: 4; nextPut: 5.
	self should: q isFull.
	self should: #(1 2 3 4 5) = q contents
	! !

!SharedBoundedQueueTest methodsFor: 'tests' stamp: 'FranciscoAryMartins 6/9/2015 13:20'!
testIsEmpty
	| q |
	q := SharedBoundedQueue capacity: 3.
	self should: q isEmpty.
	q nextPut: 1.
	self shouldnt: q isEmpty.
	self should: q next = 1.
	self should: q isEmpty.! !

!SharedBoundedQueueTest methodsFor: 'tests' stamp: 'FranciscoAryMartins 6/9/2015 13:19'!
testIsFull
	| q |
	q := SharedBoundedQueue capacity: 3.
	q nextPut: 1; nextPut: 2; nextPut: 3.
	self should: q isFull.
	q next.
	self shouldnt: q isFull.! !

!SharedBoundedQueueTest methodsFor: 'tests' stamp: 'FranciscoAryMartins 6/11/2015 13:33'!
testGrownByOnMiddle2
	| q |
	q := SharedBoundedQueue capacity: 5.
	q nextPut: 1; nextPut: 2; nextPut: 3; nextPut: 4.
	q next; next.
	q nextPut: 5; nextPut: 6.
	q grownBy: 2.
	self should: 7 = q capacity.
	q nextPut: 7; nextPut: 8; nextPut: 9.
	self should: q isFull! !

!SharedBoundedQueueTest methodsFor: 'tests' stamp: 'FranciscoAryMartins 6/11/2015 13:14'!
testGrownByOnEmpty
	| q |
	q := SharedBoundedQueue capacity: 3.
	q grownBy: 2.
	self should: q isEmpty. 
	self should: 5 = q capacity  ! !

!SharedBoundedQueueTest methodsFor: 'tests' stamp: 'FranciscoAryMartins 6/12/2015 19:41'!
testContents
	| q |
	q := SharedBoundedQueue capacity: 5.
	self should: #() = q contents.
	q nextPut: 1; nextPut: 2; nextPut: 3.
	self should: #(1 2 3) = q contents.
	q next.
	q nextPut: 4.
	self should: #(2 3 4) = q contents.
	q nextPut: 5; nextPut: 6.
	self should: q isFull.
	self should: #(2 3 4 5 6) = q contents.
	! !
Anúncios

Deixe um comentário

Preencha os seus dados abaixo ou clique em um ícone para log in:

Logotipo do WordPress.com

Você está comentando utilizando sua conta WordPress.com. Sair / Alterar )

Imagem do Twitter

Você está comentando utilizando sua conta Twitter. Sair / Alterar )

Foto do Facebook

Você está comentando utilizando sua conta Facebook. Sair / Alterar )

Foto do Google+

Você está comentando utilizando sua conta Google+. Sair / Alterar )

Conectando a %s